Skip to content
This repository has been archived by the owner on Feb 3, 2023. It is now read-only.

Fix for Async Zome Calls #2193

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions app_spec/test/files/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,20 @@ module.exports = scenario => {
const result = await alice.call('app', 'blog', 'ping', params)
t.deepEqual(result, { Ok: { msg_type: 'response', body: `got hello from ${alice.info('app').agentAddress}` } })
})

scenario('multiple zome calls', async (s, t) => {
const { alice, bob } = await s.players({ alice: one, bob: one }, true)
const params = { to_agent: bob.info('app').agentAddress, message: 'hello' }

// shut down bob so ping to bob will timeout to complete
await bob.kill()
let results = []
const f1 = alice.call('app', 'blog', 'ping', params).then(r => {results.push(2)})
const f2 = alice.call('app',"blog", "get_test_properties", {}).then(r => {results.push(1)})
await Promise.all([f1,f2])

// prove that show_env returned before ping
t.deepEqual(results,[1,2])

})
}
2 changes: 1 addition & 1 deletion app_spec/zomes/blog/code/src/blog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub fn handle_ping(to_agent: Address, message: String) -> ZomeApiResult<JsonStri
"body" : message
})
.to_string();
let received_str = hdk::send(to_agent, json_msg, 10000.into())?;
let received_str = hdk::send(to_agent, json_msg, 20000.into())?;
Ok(JsonString::from_json(&received_str))
}

Expand Down
1 change: 1 addition & 0 deletions crates/conductor_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ nickel = "=0.11.0"
url = { version = "=2.1.0", features = ["serde"] }
snowflake = "=1.3.0"
newrelic = { version = "=0.2.2", optional = true }
tokio = "=0.1.22"

[dev-dependencies]
test_utils = { version = "=0.0.49-alpha1", path = "../../test_utils" }
Expand Down
5 changes: 4 additions & 1 deletion crates/conductor_lib/src/interface_impls/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crossbeam_channel::Receiver;
use jsonrpc_core::IoHandler;
use jsonrpc_http_server::ServerBuilder;
use std::{net::SocketAddr, thread};
use tokio::runtime::Runtime;

pub struct HttpInterface {
port: u16,
Expand Down Expand Up @@ -30,8 +31,9 @@ impl Interface for HttpInterface {
kill_switch: Receiver<()>,
) -> Result<(Broadcaster, thread::JoinHandle<()>), String> {
let url = format!("0.0.0.0:{}", self.port);

let runtime = Runtime::new().map_err(|e| e.to_string())?;
let server = ServerBuilder::new(handler)
.event_loop_executor(runtime.executor())
.start_http(&url.parse().expect("Invalid URL!"))
.map_err(|e| e.to_string())?;
self.bound_address = Some(*server.address());
Expand All @@ -40,6 +42,7 @@ impl Interface for HttpInterface {
.name(format!("http_interface/{}", url))
.spawn(move || {
let _ = server; // move `server` into this thread
let _ = runtime; // move tokio runtime for RPC futures into this thread
let _ = kill_switch.recv();
})
.expect("Could not spawn thread for HTTP interface");
Expand Down
4 changes: 4 additions & 0 deletions crates/conductor_lib/src/interface_impls/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crossbeam_channel::Receiver;
use jsonrpc_core::IoHandler;
use jsonrpc_ws_server::ServerBuilder;
use std::{net::SocketAddr, thread};
use tokio::runtime::Runtime;

pub struct WebsocketInterface {
port: u16,
Expand Down Expand Up @@ -30,7 +31,9 @@ impl Interface for WebsocketInterface {
kill_switch: Receiver<()>,
) -> Result<(Broadcaster, thread::JoinHandle<()>), String> {
let url = format!("0.0.0.0:{}", self.port);
let runtime = Runtime::new().map_err(|e| e.to_string())?;
let server = ServerBuilder::new(handler)
.event_loop_executor(runtime.executor())
.start(&url.parse().expect("Invalid URL!"))
.map_err(|e| e.to_string())?;
self.bound_address = Some(*server.addr());
Expand All @@ -39,6 +42,7 @@ impl Interface for WebsocketInterface {
.name(format!("websocket_interface/{}", url))
.spawn(move || {
let _ = server; // move `server` into this thread
let _ = runtime; // move tokio runtime for RPC futures into this thread
let _ = kill_switch.recv();
})
.expect("Could not spawn thread for websocket interface");
Expand Down