Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka connection properties to profile #791

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/confluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl From<ConfluentProfile> for KafkaConfig {
password: c.secret,
},
schema_registry_enum: Some(c.schema_registry.into()),
connection_properties: HashMap::new(),
}
}
}
Expand Down
21 changes: 15 additions & 6 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl KafkaConnector {
authentication: auth,
bootstrap_servers: BootstrapServers(pull_opt("bootstrap_servers", options)?),
schema_registry_enum: schema_registry,
connection_properties: HashMap::new(),
})
}

Expand Down Expand Up @@ -926,13 +927,21 @@ pub fn client_configs(
}
};

for (k, v) in connection.connection_properties.iter() {
client_configs.insert(k.to_string(), v.to_string());
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change we'll be able to set client configs from both the connection and the table. I think it would be good to warn when iterating through the table configs (on 939) if both define the same property.

if let Some(table) = table {
client_configs.extend(
table
.client_configs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string())),
);
for (k, v) in table.client_configs.iter() {
if connection.connection_properties.contains_key(k) {
warn!(
"rdkafka config key {:?} defined in both connection and table config",
k
);
}

client_configs.insert(k.to_string(), v.to_string());
}
}

Ok(client_configs)
Expand Down
12 changes: 12 additions & 0 deletions crates/arroyo-connectors/src/kafka/profile.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@
"sensitive": ["apiSecret"]
}
]
},
"connectionProperties": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a bit better as as map instead of a vec so that users don't need to know the k/v syntax. This is how we do property overrides on the table:

"client_configs": {
"type": "object",
"title": "Client Configs",
"description": "Additional Kafka configs to pass to the underlying Kafka consumer or producer. See [Kafka Consumer Configs](https://kafka.apache.org/documentation/#consumerconfigs) and [Kafka Producer Configs](https://kafka.apache.org/documentation/#producerconfigs) for more details.",
"additionalProperties": {
"type": "string"
}
},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 agreed map is better

"type": "object",
"title": "Connection Properties",
"description": "Key-value pairs of rdkafka configuration options",
"items": {
"type": "string",
"title": "property"
},
"additionalProperties": {
"type": "string"
}
}
},
"required": ["bootstrapServers", "authentication"]
Expand Down
Loading