Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

incorrect implementation of CreateTopics specification #84

Open
pdeva opened this issue Sep 20, 2024 · 9 comments
Open

incorrect implementation of CreateTopics specification #84

pdeva opened this issue Sep 20, 2024 · 9 comments

Comments

@pdeva
Copy link
Contributor

pdeva commented Sep 20, 2024

the current implementation uses a map with topic names as keys:

pub struct CreateTopicsRequest {
    /// The topics to create.
    ///
    /// Supported API versions: 0-7
    pub topics: indexmap::IndexMap<super::TopicName, CreatableTopic>,
}

however, this prevents requests which have duplicate topic names.

so this test for example will fail here:
https://github.com/segmentio/kafka-go/blob/a8e5eabf4a90025a4ad2c28e929324d18db21103/createtopics_test.go#L77

it creates 3 topics, out of which 2 are duplicate.
it then expects an InvalidRequest error code for the duplicated topic and neither of the duplicated topics is created.

however, due to current implementation, the server will never receive the duplicate topic, since the map will just end up replacing the data of the duplicated topic, which will make server implementations create the topic instead of returning an error.

@tychedelia
Copy link
Owner

Wanted to confirm the Java behavior, and this is the underlying data structure used: ImplicitLinkedHashCollection.

@pdeva
Copy link
Contributor Author

pdeva commented Sep 20, 2024

maybe i have an older version checked out, i see it implementing ImplicitLinkedHashMultiCollection
Screenshot 2024-09-20 at 11 21 54 AM

@pdeva
Copy link
Contributor Author

pdeva commented Sep 20, 2024

confirming that even in the latest kafka commit, topics are defined as

public static class CreatableTopicCollection extends ImplicitLinkedHashMultiCollection<CreatableTopic>

the MultiCollection part allows it to hold multiple topics with same same

@pdeva
Copy link
Contributor Author

pdeva commented Sep 20, 2024

i dont think anything in the wire protcol use maps
https://kafka.apache.org/protocol#protocol_api_keys

the only collection seemed to be used is a list.

the java implementation replaces that with a MultiCollection to make it easier to lookup via key and detect duplicates, but still keeps the semantics of a list.
https://github.com/apache/kafka/blob/3783385dc1cc27246cf09ec791e4b43f577a26ea/generator/src/main/java/org/apache/kafka/message/FieldSpec.java#L527:L529

this is different from the behavior in this lib.

@rukai
Copy link
Collaborator

rukai commented Sep 27, 2024

This sounds like a problem!
I need to verify if this does actually affect all map types in the protocol.

But assuming for a second that it does, I wonder how best to solve this.
We could just replace all of our map types from IndexMap<T> to IndexMap<Vec<T>>.
But that adds an extra allocation for every element.
We could avoid that by using https://docs.rs/smallvec/latest/smallvec/ such that it stores on the stack for 1 value, but for 2 values it stores on the heap.

However, such an API would be quite error prone, we would need to manually check the length of the list before using it.

Maybe what we want is:

enum MapEntry<T> {
  Single(T)
  Duplicates(Box<[T]>)
}

impl MapEntry {
  fn get(&self) -> Result<T, Vec<T>> {
    match self {
      MapEntry::Single(x) => Ok(x),
      MapEntry::Duplicates(x) => Err(x),
    }
  }
}

And then use IndexMap<MapEntry<T>>

Actually, maybe we want to simplify that to: IndexMap<Result<T, Vec<T>>
And just document prominently what on earth that is about.

And this all assumes that it is always an error to have a duplicate map key which might not be true in some cases...

@Diggsey
Copy link
Collaborator

Diggsey commented Sep 27, 2024

I think short term the best option would be to make a thin wrapper around a Vec<T> which adds methods to lookup by key (and does a linear search of the vec in that case).

Long term you could think about forking IndexMap to create a IndexMultiMap which would natively support multiple values per key.

@tychedelia
Copy link
Owner

I think we should go for the easy/ugly Vec fix now for correctness but block any 1.0 release on having a proper multi map api similar to the Java reference.

@rukai
Copy link
Collaborator

rukai commented Sep 30, 2024

Thinking about the ways in which the protocol is actually used I think that replacing IndexMap with Vec might actually be the best for performance.

Since the kafka protocol very rarely has us request data that we dont care about, when processing a response most of the time we just call .into_iter() and then iterate over all the elements, so spending time constructing a hashmap instead of a vec is a waste.

And when constructing a hashmap for encoding we certainly dont benefit from being able to perform lookups.
I realized constructing values does actually benefit from the lookups, when creating a single batch request for multiple application side requests we want to be able to quickly lookup existing topics so that we can insert into that existing entry without creating a duplicate.

@Diggsey
Copy link
Collaborator

Diggsey commented Sep 30, 2024

@rukai I think it would still be good to use a wrapper around Vec (with zero-cost conversions) so that it can be changed in a backwards compatible way, and have additional methods provided, like get by key.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants