Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic, partition, and timestamp column kafka publishing support #4771

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,30 @@ public void describeDifferences(@NotNull List<String> differences, @NotNull fina
}
}

/**
* Checks if objects of type {@link #getDataType() dataType} can be cast to {@code destDataType} (equivalent to
* {@code destDataType.isAssignableFrom(dataType)}). If not, this throws a {@link ClassCastException}.
*
* @param destDataType the destination data type
*/
public final void checkCastTo(Class<?> destDataType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we just have a cast method that returns the type we like and (maybe) validates internally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a little bit different w/ ColumnDefinition. In other cases cases where cast is used, it is typically done to match types b/c the consumer wants to get data out. In this case, ColumnDefinition itself doesn't hold any data, it is just parameterized to support explicit typing around Class <TYPE> getDataType().

I don't see a practical use for wanting to actually cast a definition. Furthermore, it breaks the contract:

ColumnDefinition<Integer> intColDef = ...;
ColumnDefinition<Number> numColDef = intColDef.cast(Number.class);

Class<Number> numberClass = numColDef.getDataType();
// This will fail
assertEquals(Number.class, numberClass);

We do play loose and fast with this in other places; ColumnSource suffers from the same getType problem - although again, ColumnSource#cast is usually used to get data out (not b/c somebody wants getType).

In some more recent code, I've tried to "do the right thing"; io.deephaven.functions.ToObjectFunction#cast

TypeHelper.checkCastTo("[" + name + "]", dataType, destDataType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This context being passed does not mesh well with the exception message as currently composed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you took back this point.

devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Checks if objects of type {@link #getDataType() dataType} can be cast to {@code destDataType} (equivalent to
* {@code destDataType.isAssignableFrom(dataType)}) and checks that objects of type {@link #getComponentType()
* componentType} can be cast to {@code destComponentType} (both component types must be present and cast-able, or
* both must be {@code null}; when both present, is equivalent to
* {@code destComponentType.isAssignableFrom(componentType)}). If not, this throws a {@link ClassCastException}.
*
* @param destDataType the destination data type
* @param destComponentType the destination component type, may be {@code null}
*/
public final void checkCastTo(Class<?> destDataType, @Nullable Class<?> destComponentType) {
TypeHelper.checkCastTo("[" + name + "]", dataType, componentType, destDataType, destComponentType);
}

public boolean equals(final Object other) {
if (this == other) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ default Object exportElement(T tuple, int elementIndex) {
* {@code String} data:
*
* <pre>
* ColumnSource&lt;String&gt; colSource = table.getColumnSource("MyString").getParameterized(String.class)
* ColumnSource&lt;String&gt; colSource = table.getColumnSource("MyString").cast(String.class)
* </pre>
* <p>
* Due to the nature of type erasure, the JVM will still insert an additional cast to {@code TYPE} when elements are
Expand All @@ -163,11 +163,7 @@ default Object exportElement(T tuple, int elementIndex) {
@FinalDefault
default <TYPE> ColumnSource<TYPE> cast(Class<? extends TYPE> clazz) {
Require.neqNull(clazz, "clazz");
final Class<?> columnSourceType = getType();
if (!clazz.isAssignableFrom(columnSourceType)) {
throw new ClassCastException(String.format("Cannot convert column source for type %s to type %s",
columnSourceType.getName(), clazz.getName()));
}
TypeHelper.checkCastTo("ColumnSource", getType(), clazz);
// noinspection unchecked
return (ColumnSource<TYPE>) this;
}
Expand All @@ -184,7 +180,7 @@ default <TYPE> ColumnSource<TYPE> cast(Class<? extends TYPE> clazz) {
* {@code String} data:
*
* <pre>
* ColumnSource&lt;String&gt; colSource = table.getColumnSource("MyString", null).getParameterized(String.class)
* ColumnSource&lt;String&gt; colSource = table.getColumnSource("MyString").cast(String.class, null)
* </pre>
* <p>
* Due to the nature of type erasure, the JVM will still insert an additional cast to {@code TYPE} when elements are
Expand All @@ -197,19 +193,10 @@ default <TYPE> ColumnSource<TYPE> cast(Class<? extends TYPE> clazz) {
*/
@FinalDefault
default <TYPE> ColumnSource<TYPE> cast(Class<? extends TYPE> clazz, @Nullable Class<?> componentType) {
final ColumnSource<TYPE> casted = cast(clazz);
final Class<?> columnSourceComponentType = getComponentType();
if ((componentType == null && columnSourceComponentType == null) || (componentType != null
&& columnSourceComponentType != null && componentType.isAssignableFrom(columnSourceComponentType))) {
return casted;
}
final Class<?> columnSourceType = getType();
throw new ClassCastException(String.format(
"Cannot convert column source componentType for type %s to %s (for %s / %s)",
columnSourceComponentType == null ? null : columnSourceComponentType.getName(),
componentType == null ? null : componentType.getName(),
columnSourceType.getName(),
clazz.getName()));
Require.neqNull(clazz, "clazz");
TypeHelper.checkCastTo("ColumnSource", getType(), getComponentType(), clazz, componentType);
// noinspection unchecked
return (ColumnSource<TYPE>) this;
}

/**
Expand Down
18 changes: 18 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,33 +228,51 @@ public interface Table extends
* caller expects. This differs from {@link #getColumnSource(String, Class)} which uses the provided {@link Class}
* object to verify that the data type is a subclass of the expected class.
*
* <p>
* The success of this call is equivalent to {@code getDefinition().checkColumn(sourceName)}, which is the preferred
* way to check for compatibility in scenarios where the caller does not want the implementation to potentially
* invoke {@link #coalesce()}.
*
* @param sourceName The name of the column
* @param <T> The target type, as a type parameter. Inferred from context.
* @return The column source for {@code sourceName}, parameterized by {@code T}
* @see TableDefinition#checkHasColumn(String)
*/
<T> ColumnSource<T> getColumnSource(String sourceName);

/**
* Retrieves a {@code ColumnSource} and {@link ColumnSource#cast(Class) casts} it to the target class {@code clazz}.
*
* <p>
* The success of this call is equivalent to {@code getDefinition().checkColumn(sourceName, clazz)}, which is the
* preferred way to check for compatibility in scenarios where the caller does not want to the implementation to
* potentially invoke {@link #coalesce()}.
*
* @param sourceName The name of the column
* @param clazz The target type
* @param <T> The target type, as a type parameter. Intended to be inferred from {@code clazz}.
* @return The column source for {@code sourceName}, parameterized by {@code T}
* @see ColumnSource#cast(Class)
* @see TableDefinition#checkHasColumn(String, Class)
*/
<T> ColumnSource<T> getColumnSource(String sourceName, Class<? extends T> clazz);

/**
* Retrieves a {@code ColumnSource} and {@link ColumnSource#cast(Class, Class)} casts} it to the target class
* {@code clazz} and {@code componentType}.
*
* <p>
* The success of this call is equivalent to {@code getDefinition().checkColumn(sourceName, clazz, componentType)},
* which is the preferred way to check for compatibility in scenarios where the caller does not want the
* implementation to potentially invoke {@link #coalesce()}.
*
* @param sourceName The name of the column
* @param clazz The target type
* @param componentType The target component type, may be null
* @param <T> The target type, as a type parameter. Intended to be inferred from {@code clazz}.
* @return The column source for {@code sourceName}, parameterized by {@code T}
* @see ColumnSource#cast(Class, Class)
* @see TableDefinition#checkHasColumn(String, Class, Class)
*/
<T> ColumnSource<T> getColumnSource(String sourceName, Class<? extends T> clazz, @Nullable Class<?> componentType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,40 @@ public final void checkHasColumn(@NotNull String columnName) {
NoSuchColumnException.throwIf(getColumnNameSet(), columnName);
}

/**
* Checks if {@code columnName} exists and supports {@link ColumnDefinition#checkCastTo(Class)} with {@code clazz}.
* Otherwise, throws a {@link NoSuchColumnException} or a {@link ClassCastException}.
*
* @param columnName the column name
* @param clazz the data type
* @see ColumnDefinition#checkCastTo(Class)
*/
public final void checkHasColumn(@NotNull String columnName, @NotNull Class<?> clazz) {
final ColumnDefinition<?> cd = getColumn(columnName);
if (cd == null) {
throw new NoSuchColumnException(getColumnNameSet(), columnName);
}
cd.checkCastTo(clazz);
}

/**
* Checks if {@code columnName} exists and supports {@link ColumnDefinition#checkCastTo(Class, Class)} with
* {@code clazz} and {@code componentType}. Otherwise, throws a {@link NoSuchColumnException} or a
* {@link ClassCastException}.
*
* @param columnName the column name
* @param clazz the data type
* @param componentType the component type
* @see ColumnDefinition#checkCastTo(Class, Class)
*/
public final void checkHasColumn(@NotNull String columnName, @NotNull Class<?> clazz, Class<?> componentType) {
final ColumnDefinition<?> cd = getColumn(columnName);
if (cd == null) {
throw new NoSuchColumnException(getColumnNameSet(), columnName);
}
cd.checkCastTo(clazz, componentType);
}

/**
* Check this definition to ensure that all {@code columns} are present.
*
Expand Down
36 changes: 36 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/TypeHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.engine.table;

class TypeHelper {

// Could be good to move this to io.deephaven.qst.type.Type layer

public static void checkCastTo(String context, Class<?> srcType, Class<?> destType) {
if (!destType.isAssignableFrom(srcType)) {
throw new ClassCastException(String.format("Cannot convert %s of type %s to type %s",
context, srcType.getName(), destType.getName()));
}
}

public static void checkCastTo(
String prefix,
Class<?> srcType,
Class<?> srcComponentType,
Class<?> destType,
Class<?> destComponentType) {
checkCastTo(prefix, srcType, destType);
if ((srcComponentType == null && destComponentType == null) || (srcComponentType != null
&& destComponentType != null && destComponentType.isAssignableFrom(srcComponentType))) {
return;
}
throw new ClassCastException(String.format(
"Cannot convert %s componentType of type %s to %s (for %s / %s)",
prefix,
srcComponentType == null ? null : srcComponentType.getName(),
destComponentType == null ? null : destComponentType.getName(),
srcType.getName(),
destType.getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
package io.deephaven.kafka;

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.api.ColumnName;
import io.deephaven.engine.table.Table;
import io.deephaven.kafka.KafkaTools.Produce;
import io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;

import javax.annotation.Nullable;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;

/**
Expand All @@ -34,16 +39,26 @@ public static Builder builder() {
public abstract Table table();

/**
* The kafka topic to publish to.
* The default Kafka topic to publish to. When {@code null}, {@link #topicColumn()} must be set.
*
* @return the kafka topic
* @return the default Kafka topic
* @see #topicColumn()
*/
@Nullable
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
public abstract String topic();

/**
* The kafka configuration properties.
* The default Kafka partition to publish to.
*
* @return the kafka configuration
* @return the default Kafka partition
* @see #partitionColumn()
*/
public abstract OptionalInt partition();

/**
* The Kafka configuration properties.
*
* @return the Kafka configuration
*/
public abstract Properties config();

Expand Down Expand Up @@ -93,6 +108,38 @@ public boolean publishInitial() {
return true;
}

/**
* The topic column. When set, uses the the given {@link CharSequence}-compatible column from {@link #table()} as
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
* the first source for setting the Kafka record topic. When not present, or if the column value is null,
* {@link #topic()} will be used.
*
* @return the topic column name
*/
public abstract Optional<ColumnName> topicColumn();

/**
* The partition column. When set, uses the the given {@code int} column from {@link #table()} as the first source
* for setting the Kafka record partition. When not present, or if the column value is null, {@link #partition()}
* will be used if present. If a valid partition number is specified, that partition will be used when sending the
* record. Otherwise, Kafka will choose a partition using a hash of the key if the key is present, or will assign a
* partition in a round-robin fashion if the key is not present.
*
* @return the partition column name
*/
public abstract Optional<ColumnName> partitionColumn();

/**
* The timestamp column. When set, uses the the given {@link Instant} column from {@link #table()} as the first
* source for setting the Kafka record timestamp. When not present, or if the column value is null, the producer
* will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type
* configured for the topic. If the topic is configured to use CreateTime, the timestamp in the producer record will
* be used by the broker. If the topic is configured to use LogAppendTime, the timestamp in the producer record will
* be overwritten by the broker with the broker local time when it appends the message to its log.
*
* @return the timestamp column name
*/
public abstract Optional<ColumnName> timestampColumn();

@Check
final void checkNotBothIgnore() {
if (Produce.isIgnore(keySpec()) && Produce.isIgnore(valueSpec())) {
Expand All @@ -114,12 +161,42 @@ final void checkLastBy() {
}
}

@Check
final void checkTopic() {
if (topic() == null && topicColumn().isEmpty()) {
throw new IllegalArgumentException("Must set topic or topicColumn (or both)");
}
}

@Check
final void checkTopicColumn() {
if (topicColumn().isPresent()) {
table().getDefinition().checkHasColumn(topicColumn().get().name(), CharSequence.class);
}
}

@Check
final void checkPartitionColumn() {
if (partitionColumn().isPresent()) {
table().getDefinition().checkHasColumn(partitionColumn().get().name(), int.class);
}
}

@Check
final void checkTimestampColumn() {
if (timestampColumn().isPresent()) {
table().getDefinition().checkHasColumn(timestampColumn().get().name(), Instant.class);
}
}

public interface Builder {

Builder table(Table table);

Builder topic(String topic);

Builder partition(int partition);

Builder config(Properties config);

Builder keySpec(KeyOrValueSpec keySpec);
Expand All @@ -130,6 +207,12 @@ public interface Builder {

Builder publishInitial(boolean publishInitial);

Builder topicColumn(ColumnName columnName);

Builder partitionColumn(ColumnName columnName);

Builder timestampColumn(ColumnName columnName);

KafkaPublishOptions build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1442,12 +1442,16 @@ public static Runnable produceFromTable(KafkaPublishOptions options) {
options.config(),
effectiveTable,
options.topic(),
options.partition().isEmpty() ? null : options.partition().getAsInt(),
keyColumns,
keySpecSerializer,
keySerializer,
valueColumns,
valueSpecSerializer,
valueSerializer,
options.topicColumn().orElse(null),
options.partitionColumn().orElse(null),
options.timestampColumn().orElse(null),
options.publishInitial());
}
return publisherScope::release;
Expand Down
Loading
Loading