Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process fetch responses #64

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Process fetch responses #64

wants to merge 3 commits into from

Conversation

jost-s
Copy link
Contributor

@jost-s jost-s commented Jan 3, 2025

Add a queue to process incoming responses to fetch requests. This completes the cycle from sending fetch request -> sending fetch response -> receiving fetch response.

There's a rename stacked onto this PR. Proto definition changes Request to FetchRequest and Response to FetchResponse. That makes the PR look bigger than it is.

resolves #62

@jost-s jost-s changed the base branch from main to feat/fetch-response-queue January 3, 2025 02:16
@jost-s jost-s force-pushed the feat/process-fetch-responses branch from 59c92a7 to 153b705 Compare January 3, 2025 02:21
crates/api/src/op_store.rs Outdated Show resolved Hide resolved
@jost-s jost-s force-pushed the feat/process-fetch-responses branch from 153b705 to 85f839e Compare January 3, 2025 02:28
@jost-s jost-s requested a review from a team January 3, 2025 02:28
@jost-s jost-s force-pushed the feat/fetch-response-queue branch from 4e00657 to 57b3547 Compare January 3, 2025 02:32
@jost-s jost-s force-pushed the feat/process-fetch-responses branch from 85f839e to f338b3a Compare January 3, 2025 02:34
@jost-s jost-s force-pushed the feat/fetch-response-queue branch 9 times, most recently from b975539 to 5c335a0 Compare January 8, 2025 22:56
Base automatically changed from feat/fetch-response-queue to main January 9, 2025 00:32
@jost-s jost-s force-pushed the feat/process-fetch-responses branch 4 times, most recently from fa3efea to a4a1e86 Compare January 16, 2025 19:54
neonphog
neonphog previously approved these changes Jan 16, 2025
Copy link
Collaborator

@neonphog neonphog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍

Copy link
Member

@ThetaSinner ThetaSinner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great. I have a few comments to add but all of them are minor :)

@@ -3,6 +3,7 @@
use crate::{
builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp,
};
use bytes::Bytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most places we've used bytes::Bytes rather than importing. I'm not super fussy which we use but I'd slightly prefer to be consistent.

"Your OCD is your own problem" is a valid response to this minor comment :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and that is actually the only change to this file, is to make sure this type is imported. Do you have a preference for importing everywhere then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen this and adopted it too, until I wondered why we're doing this. In several places there are these half-qualified types and it makes the code generally more wordy and harder to read. Coming to think of it, I only find it useful when there are identical or similar types in different crates. Bytes is unique and so are probably all of our own types like the stores and the factories.
In that light I prefer importing everywhere.

@@ -466,6 +476,33 @@ impl CoreFetch {
}
}

async fn spawn_incoming_response_task(
incoming_response_rx: Arc<
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be in an arc+mutex? We're doing a single receive task here and not multiple like with the other side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't 👍

@@ -303,6 +304,15 @@ impl CoreFetch {
));
tasks.push(incoming_request_task);

// Spawn incoming response task.
let incoming_response_task =
tokio::task::spawn(CoreFetch::spawn_incoming_response_task(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing - it's slightly odd to have the function called spawn_xxx_task but have it actually return a future task that needs to be spawned.

Either the tokio::spawn could go inside the function and the function just returns the join handle or could the function just be incoming_response_task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did that. I like it better too.

crates/core/src/factories/core_fetch.rs Outdated Show resolved Hide resolved
time::Duration,
};

const SPACE_ID: SpaceId = SpaceId(id::Id(bytes::Bytes::from_static(b"space1")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Id love some test data somewhere that has a few things like this. Not something that needs to be done on this PR but if anybody spots an opportunity to put something like that in place, it'd be very welcome. I'll look for an opportunity too :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the test space id to the test utils.


#[tokio::test(flavor = "multi_thread")]
async fn incoming_ops_are_written_to_op_store() {
let builder =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the setup be extracted from the tests? It looks like there's some duplicate code here to wire up the peer store, transport and fetch module

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also done for the other test files.

)
.unwrap();

tokio::time::sleep(Duration::from_millis(10)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can this be a tokio::time::timeout with a higher timeout that checks for the condition until success?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly, because it's giving fetch the chance to process incoming ops to later on check if something has not happened. I don't see how I could check for the ops having been processed and nothing having been written to the ops store to know when to assert this.

@jost-s jost-s requested a review from ThetaSinner January 18, 2025 02:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[k2] fetch response handler
3 participants