-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement graph binary protocol #217
base: master
Are you sure you want to change the base?
Changes from 38 commits
153b6cd
076949f
cbaa2bf
946d132
dce5400
e426d0d
6cfc168
97a3de6
0a39163
36a9989
c810c07
6de4156
5330d8f
ee6086f
f273b14
5b5108c
678792f
4ca0eb5
455e329
4a42319
32b47ac
815767a
753dbf7
0368454
54b2457
35d22ed
490e925
69da91e
6db1750
e570908
f2880a0
2492d09
e9a0c6d
baccdd5
6626be3
d07679e
fe2c0de
2083461
e2cbd2c
fb3bf91
1bd393d
0bd9f9a
ae34c00
aacd284
f54ea77
85a58f1
a1bbe79
b4d9015
5a4931b
97ca6c1
2d2b13a
081ad4e
5fb1adb
03cb90a
49799e2
766f045
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general all the changes here and in the blocking client are to encapsulate the differences between the serializers so the binary protocol can be easily called from the same code path as the graphson protocols |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
use crate::{GremlinError, GremlinResult, WebSocketOptions}; | ||
use crate::{GremlinError, GremlinResult, IoProtocol}; | ||
|
||
use crate::connection::ConnectionOptions; | ||
|
||
|
@@ -165,7 +165,7 @@ impl Conn { | |
|
||
sender_loop(sink, requests.clone(), receiver); | ||
|
||
receiver_loop(stream, requests.clone(), sender.clone()); | ||
receiver_loop(stream, requests.clone(), sender.clone(), opts.deserializer); | ||
|
||
Ok(Conn { | ||
sender, | ||
|
@@ -266,6 +266,7 @@ fn receiver_loop( | |
mut stream: SplitStream<WSStream>, | ||
requests: Arc<Mutex<HashMap<Uuid, Sender<GremlinResult<Response>>>>>, | ||
mut sender: Sender<Cmd>, | ||
deserializer: IoProtocol, | ||
) { | ||
task::spawn(async move { | ||
loop { | ||
|
@@ -283,10 +284,24 @@ fn receiver_loop( | |
} | ||
Some(Ok(item)) => match item { | ||
Message::Binary(data) => { | ||
let response: Response = serde_json::from_slice(&data).unwrap(); | ||
let response = deserializer | ||
.read_response(data) | ||
.expect("Unable to parse message"); | ||
let mut guard = requests.lock().await; | ||
|
||
//GraphBinary permits a null response request id, so in lieu of a request id assume | ||
//a single entry in the requests to be the one we should respond to given connection | ||
//multiplexing isn't currently implemented | ||
let request_id = response.request_id.unwrap_or_else(|| { | ||
Comment on lines
+292
to
+294
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At least on the GraphBinary side its documented that request Id can come back null. This seemed like the best means to try to mitigate that when it happens. Since connection multiplexing isn't in play, it seems like this should be an okay tradeoff. The alternative would be removing the whole |
||
if guard.len() == 1 { | ||
guard.keys().next().expect("Should have had only 1 key").clone() | ||
} else { | ||
panic!("Request response without request id was received, but there isn't only 1 request currently submitted"); | ||
} | ||
}); | ||
|
||
if response.status.code != 206 { | ||
let item = guard.remove(&response.request_id); | ||
let item = guard.remove(&request_id); | ||
drop(guard); | ||
if let Some(mut s) = item { | ||
match s.send(Ok(response)).await { | ||
|
@@ -295,7 +310,7 @@ fn receiver_loop( | |
}; | ||
} | ||
} else { | ||
let item = guard.get_mut(&response.request_id); | ||
let item = guard.get_mut(&request_id); | ||
if let Some(s) = item { | ||
match s.send(Ok(response)).await { | ||
Ok(_r) => {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each test generally used labels to prevent tests from stepping on each other. However the parameterized serializers will run concurrently. So I used
serial_test
on tests that parameters that operate on the same test don't step on each other and cause flakey failures.