Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic, partition, and timestamp column kafka publishing support #4771

Merged

Conversation

devinrsmith
Copy link
Member

This adds optional configuration for publishing a kafka record with the topic, partition, and/or timestamp as specified by their respective columns.

Additionally, this also adds the ability to specify a default partition.

Fixes #4767

This adds optional features for publishing a kafka record with the topic, partition, and/or timestamp as specified by their respective columns.

Additionally, this also adds the ability to easily specify a default partition.

Fixes deephaven#4767
@devinrsmith devinrsmith added this to the November 2023 milestone Nov 3, 2023
@devinrsmith devinrsmith self-assigned this Nov 3, 2023
devinrsmith added a commit that referenced this pull request Nov 15, 2023
Adds some helper methods for on `TableDefinition#checkHasColumn`, `TableDefinition#checkHasColumns`, and `TableDefinition#getColumnNameSet`.

Additionally, fixes up call sites that were (ab)using `Table#getColumnSourceMap` to simply get the keySet. This invokes a potentially extraneous Table#coalesce which can be avoided in these cases.

In support of common scaffolding so #4771 won't need to call `Table#getColumnSource` for validation purposes.
py/server/deephaven/stream/kafka/producer.py Outdated Show resolved Hide resolved
py/server/deephaven/stream/kafka/producer.py Outdated Show resolved Hide resolved
py/server/deephaven/stream/kafka/producer.py Outdated Show resolved Hide resolved
@devinrsmith devinrsmith requested a review from chipkent November 15, 2023 21:20
chipkent
chipkent previously approved these changes Nov 16, 2023
*
* @param destDataType the destination data type
*/
public final void checkCastTo(Class<?> destDataType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we just have a cast method that returns the type we like and (maybe) validates internally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a little bit different w/ ColumnDefinition. In other cases cases where cast is used, it is typically done to match types b/c the consumer wants to get data out. In this case, ColumnDefinition itself doesn't hold any data, it is just parameterized to support explicit typing around Class <TYPE> getDataType().

I don't see a practical use for wanting to actually cast a definition. Furthermore, it breaks the contract:

ColumnDefinition<Integer> intColDef = ...;
ColumnDefinition<Number> numColDef = intColDef.cast(Number.class);

Class<Number> numberClass = numColDef.getDataType();
// This will fail
assertEquals(Number.class, numberClass);

We do play loose and fast with this in other places; ColumnSource suffers from the same getType problem - although again, ColumnSource#cast is usually used to get data out (not b/c somebody wants getType).

In some more recent code, I've tried to "do the right thing"; io.deephaven.functions.ToObjectFunction#cast

* @param destDataType the destination data type
*/
public final void checkCastTo(Class<?> destDataType) {
TypeHelper.checkCastTo("[" + name + "]", dataType, destDataType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This context being passed does not mesh well with the exception message as currently composed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you took back this point.

py/server/deephaven/stream/kafka/producer.py Outdated Show resolved Hide resolved
Copy link
Contributor

@jmao-denver jmao-denver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python changes LGTM

@devinrsmith devinrsmith enabled auto-merge (squash) November 17, 2023 16:05
@devinrsmith devinrsmith merged commit 53049b8 into deephaven:main Nov 17, 2023
10 checks passed
@devinrsmith devinrsmith deleted the kafka-record-fields-from-column-source branch November 17, 2023 16:19
@github-actions github-actions bot locked and limited conversation to collaborators Nov 17, 2023
@deephaven-internal
Copy link
Contributor

Labels indicate documentation is required. Issues for documentation have been opened:

How-to: https://github.com/deephaven/deephaven.io/issues/3450
Reference: https://github.com/deephaven/deephaven.io/issues/3451

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support kafka publishing into specific partition on row-by-row basis
5 participants