Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TOP pool synchronization #2211

Merged
merged 36 commits into from
Nov 24, 2023
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8da59e6
first working version
kziemianek Oct 9, 2023
866b341
Merge branch 'dev' into 1935-broadcast-top
kziemianek Oct 17, 2023
98203b5
small tweaks
kziemianek Oct 17, 2023
8b0dddc
updating peers
kziemianek Oct 19, 2023
65c5a0d
trusted call broadcasting tweaks
kziemianek Oct 20, 2023
0769dcc
remove broadcasted request encryption/decryption
kziemianek Oct 23, 2023
57026c6
test
kziemianek Oct 23, 2023
00e96fb
add more unit tests
kziemianek Oct 23, 2023
9827919
cargo cleanup
kziemianek Oct 23, 2023
acdea60
fix
kziemianek Oct 24, 2023
4fc0ca3
small adjustments
kziemianek Oct 24, 2023
3384f21
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Oct 24, 2023
03ae3d9
adjust waiting time and add comment
kziemianek Oct 26, 2023
02b4521
one thread per trusted rpc ws client
kziemianek Oct 27, 2023
da2793d
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Oct 27, 2023
29f8d38
add/remove peers
kziemianek Oct 28, 2023
5e4fd97
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Oct 28, 2023
51fa858
restore peer connections
kziemianek Oct 30, 2023
c5935a4
broadcast aes trusted calls
kziemianek Nov 2, 2023
fe6ab25
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 3, 2023
724d736
broadcast stf-task created trusted calls
kziemianek Nov 6, 2023
97ca7bb
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 7, 2023
470a985
remove mutex from DirectRpcClient
kziemianek Nov 7, 2023
702c3d8
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 8, 2023
95f1a53
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 13, 2023
d26429f
review suggestions
kziemianek Nov 13, 2023
6aa26e3
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 13, 2023
0275af1
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 14, 2023
95aad63
Merge branch 'dev' into p-100-linear-processing-of-di-requests
Kailai-Wang Nov 14, 2023
1dade5b
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 15, 2023
c0a3beb
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 16, 2023
79d909e
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 22, 2023
f77057c
move broadcasting to Author
kziemianek Nov 24, 2023
3c9c6ed
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 24, 2023
a010e7f
add author_tests
kziemianek Nov 24, 2023
d303863
Merge branch 'dev' into p-100-linear-processing-of-di-requests
kziemianek Nov 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove mutex from DirectRpcClient
  • Loading branch information
kziemianek committed Nov 7, 2023
commit 470a9851aa32f980cd29e85b27acea77e78a3f02
57 changes: 22 additions & 35 deletions tee-worker/core/direct-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -36,12 +36,6 @@ use crate::sgx_reexport_prelude::*;

extern crate alloc;

#[cfg(feature = "sgx")]
use std::sync::SgxMutex as Mutex;

#[cfg(feature = "std")]
use std::sync::Mutex;

use alloc::format;

use core::str::FromStr;
@@ -61,7 +55,10 @@ use std::{
error::Error,
net::TcpStream,
string::{String, ToString},
sync::{mpsc::SyncSender, Arc},
sync::{
mpsc::{channel, Sender, SyncSender},
Arc,
},
time::Duration,
vec,
vec::Vec,
@@ -129,11 +126,10 @@ impl RpcClientFactory for DirectRpcClientFactory {

pub trait RpcClient {
fn send(&mut self, request_id: String, params: RequestParams) -> Result<(), Box<dyn Error>>;
fn is_alive(&self) -> bool;
}

pub struct DirectRpcClient {
ws: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>,
requests_sink: Sender<String>,
}

impl DirectRpcClient {
@@ -153,27 +149,30 @@ impl DirectRpcClient {
client_tls_with_config(ws_server_url, stream, None, Some(connector))
.map_err(|e| format!("Could not open websocket connection: {:?}", e))?;

let (request_sender, requests_receiver) = channel();

//it fails to perform handshake in non_blocking mode so we are setting it up after the handshake is performed
Self::switch_to_non_blocking(&mut socket);

let socket = Arc::new(Mutex::new(socket));
let cloned_socket = socket.clone();

std::thread::spawn(move || loop {
if let Ok(mut socket) = cloned_socket.lock() {
if let Ok(message) = socket.read_message() {
if let Ok(Some(response)) = Self::handle_ws_message(message) {
if let Err(e) = responses_sink.send(response) {
log::error!("Could not forward response, reason: {:?}", e)
};
}
// let's flush all pending requests first
while let Ok(request) = requests_receiver.try_recv() {
socket.write_message(Message::Text(request)).unwrap()
}

if let Ok(message) = socket.read_message() {
if let Ok(Some(response)) = Self::handle_ws_message(message) {
if let Err(e) = responses_sink.send(response) {
log::error!("Could not forward response, reason: {:?}", e)
};
}
}
std::thread::sleep(Duration::from_millis(10))
});

debug!("Connected to peer: {}", url);
Ok(Self { ws: socket })

Ok(Self { requests_sink: request_sender })
}

fn switch_to_non_blocking(socket: &mut WebSocket<MaybeTlsStream<TcpStream>>) {
@@ -259,21 +258,9 @@ impl RpcClient for DirectRpcClient {
RequestParams::Rsa(params) => self.prepare_rsa_request(request_id, params)?,
RequestParams::Aes(params) => self.prepare_aes_request(request_id, params)?,
};

if let Ok(mut ws) = self.ws.lock() {
ws.write_message(Message::Text(request))
.map_err(|e| format!("Could not write message, reason: {:?}", e).into())
} else {
Ok(())
}
}

fn is_alive(&self) -> bool {
if let Ok(ws) = self.ws.lock() {
ws.can_write()
} else {
false
}
self.requests_sink
.send(request)
.map_err(|e| format!("Could not write message, reason: {:?}", e).into())
}
}

15 changes: 4 additions & 11 deletions tee-worker/core/peer-top-broadcaster/src/lib.rs
Original file line number Diff line number Diff line change
@@ -226,7 +226,8 @@ where
ClientFactory: RpcClientFactory,
{
// created new map filled with rpc clients connected to peer from the provided list. Reuses existing
// connections
// connections. The list will not containt peers that are unreachable, so following logic will automatically
// remove all dead connections
fn update(&self, peers: Vec<String>) {
log::debug!("Updating peers: {:?}", &peers);
let mut new_peers_list = self.new_clear_peer_map();
@@ -236,14 +237,10 @@ where
log::info!("Adding a peer: {}", peer.clone());
self.connect_to(&peer, &mut new_peers_list)
} else {
log::info!("Reusing existing peer: {}", peer.clone());
//this is safe as we previously ensured that map contains such key
let peer_to_move = peers.remove(&peer).unwrap();

if peer_to_move.is_alive() {
new_peers_list.insert(peer, peer_to_move);
} else {
self.connect_to(&peer, &mut new_peers_list);
}
new_peers_list.insert(peer, peer_to_move);
}
}
}
@@ -286,10 +283,6 @@ pub mod tests {
self.sent_requests = self.sent_requests + 1;
Ok(())
}

fn is_alive(&self) -> bool {
true
}
}

impl MockedRpcClient {