Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream crds node updates via geyser #3434

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

gregcusack
Copy link

@gregcusack gregcusack commented Nov 1, 2024

Towards removing RPCs from the validator.

Summary of Changes

Stream crds node updates via geyser
Stream node removals from crds via geyser

Expose an FFI interface that wraps ContactInfo
FFI based off of FFI Transaction Interface here: #2125

@gregcusack gregcusack force-pushed the geyser-gossip branch 2 times, most recently from 197d4dd to a9c4800 Compare November 7, 2024 06:12
@gregcusack gregcusack force-pushed the geyser-gossip branch 2 times, most recently from 6b6e493 to d88db4b Compare November 7, 2024 18:43
Copy link

@godmodegalactus godmodegalactus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Just dont like unsafe code that much.

/// Default is true -- if the plugin is not interested in
/// gossip messages, please return false.
fn node_update_notifications_enabled(&self) -> bool {
true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be false by default.

#[repr(C)]
#[derive(Clone, Copy, Debug, Default)]
pub struct FfiSocketAddr {
pub is_v4: u8, // 1 if IPv4, 0 if IPv6

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be

struct FfiSocketAddr {
    ip: FfiIpAddr,
    port: u16
}

@@ -565,6 +593,7 @@ impl Crds {
match value.value.data() {
CrdsData::ContactInfo(_) => {
self.nodes.swap_remove(&index);
self.notify_remove_node(&value.value.pubkey());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All CrdsData is being notified during update, while only contact info is notified during remove.
Could be intended behavior.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so CrdsData is only being updated on ContactInfo as well. See: https://github.com/gregcusack/solana/blob/bd9dbe00404343a01072a75970a5520dbb426c21/gossip/src/crds.rs#L256-L258

this^ function does the check for ContactInfo instead of the check being done in crds.insert().

@gregcusack
Copy link
Author

Overall LGTM. Just dont like unsafe code that much.

ya i don't either. However, I am not sure how possible it is to create an FFI interface that is completely safe...

Comment on lines +465 to +477
#[allow(unused_variables)]
fn notify_node_update(&self, interface: &FfiContactInfoInterface) -> Result<()> {
Ok(())
}

/// Called when a node is removed from the network
/// TODO: may need to provide wrapper here? Also maybe ok
/// if we marshall to repr(c) for this...
#[allow(unused_variables)]
fn notify_node_removal(&self, pubkey: &FfiPubkey) -> Result<()> {
Ok(())
}

Copy link
Author

@gregcusack gregcusack Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also this is not a proper FFI interface since it is returning a Result<()>, so need to update. Although it may make sense to do this in stages. need to think about it

@behzadnouri
Copy link

behzadnouri commented Nov 11, 2024

This is adding more work to Crds::insert while we hold an exclusive lock, so I would rather not to go with this approach.

Better alternative is to pull ContactInfos only when needed or every once in a while (e.g. every 5 seconds, or 10 seconds, whatever). Similar logic to TTL expiry in ClusterNodesCache in turbine:
https://github.com/anza-xyz/agave/blob/7629a152d/turbine/src/cluster_nodes.rs#L475

See also: #46 (comment)

@gregcusack
Copy link
Author

This is adding more work to Crds::insert while we hold an exclusive lock, so I would rather not to go with this approach.

Ya this is a good point

Better alternative is to pull ContactInfos only when needed or every once in a while (e.g. every 5 seconds, or 10 seconds, whatever). Similar logic to TTL expiry in ClusterNodesCache in turbine: https://github.com/anza-xyz/agave/blob/7629a152d/turbine/src/cluster_nodes.rs#L475

See also: #46 (comment)

ya going to look into this approach this week. Geyser is more of a push service with the general idea pushing data into some backend DB that an RPC service can query. But we have to have some service sitting in front of the DB ingesting the streamed data. So, we could modify that service to periodically query a node and get the current set of known ContactInfos. Although this forces the Validator to implement some time of RPC (same RPC we are trying to remove from the valdiator) to handle and respond to these requests.

@gregcusack
Copy link
Author

In this PR, we pass an opaque pointer, ContactInfoPtr, to the consumer instead of directly exposing ContactInfo. This will keep ContactInfo internal details hidden from C/other language consumers.

If we end up changing ContactInfo down the road (aka by adding some new fields), we only need to add the corresponding getter functions to FfiContactInfoInterface struct. This actually not ideal since consumers would have to update their own code to take in the FfiContactInfoInterface struct.

What we could do, is define FfiContactInfoInterface like:

#[repr(C)]
pub struct FfiContactInfoInterface {
    pub contact_info_ptr: ContactInfoPtr,
}

// When creating the interface, you only set the pointer
pub unsafe fn create_contact_info_interface(contact_info: &ContactInfo) -> FfiContactInfoInterface {
    FfiContactInfoInterface {
        contact_info_ptr: contact_info as *const ContactInfo as ContactInfoPtr,
    }
}

and then define the extern "C" methods like:

extern "C" fn get_pubkey(contact_info_ptr: ContactInfoPtr) -> Key {
    let contact_info = unsafe { &*(contact_info_ptr as *const ContactInfo) };
    contact_info.pubkey().as_ref().as_ptr()
}
// etc...

Problem with this is that then all of these methods pollute the global namespace with the function names. But at least we could change ContactInfo whenever and then would only need to define the new extern "C" fn new_method().... The consumer would be able to call this new method no problem and/or continue to operate as normal since the FfiContactInfoInterface struct would not have changed.

Regardless if we go with a push or pull method, we still need some FFI interface. Curious to hear thoughts on this.

thiserror::Error,
};

#[repr(C)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably best to give this an explicit u type (ie. u32)?
even with repr(C) here, if we added enough variants then it could become something new.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants