diff --git a/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java b/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java index fe19fcee9c9..e729a627bf8 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/ColumnDefinition.java @@ -504,6 +504,30 @@ public void describeDifferences(@NotNull List differences, @NotNull fina } } + /** + * Checks if objects of type {@link #getDataType() dataType} can be cast to {@code destDataType} (equivalent to + * {@code destDataType.isAssignableFrom(dataType)}). If not, this throws a {@link ClassCastException}. + * + * @param destDataType the destination data type + */ + public final void checkCastTo(Class destDataType) { + TypeHelper.checkCastTo("[" + name + "]", dataType, destDataType); + } + + /** + * Checks if objects of type {@link #getDataType() dataType} can be cast to {@code destDataType} (equivalent to + * {@code destDataType.isAssignableFrom(dataType)}) and checks that objects of type {@link #getComponentType() + * componentType} can be cast to {@code destComponentType} (both component types must be present and cast-able, or + * both must be {@code null}; when both present, is equivalent to + * {@code destComponentType.isAssignableFrom(componentType)}). If not, this throws a {@link ClassCastException}. + * + * @param destDataType the destination data type + * @param destComponentType the destination component type, may be {@code null} + */ + public final void checkCastTo(Class destDataType, @Nullable Class destComponentType) { + TypeHelper.checkCastTo("[" + name + "]", dataType, componentType, destDataType, destComponentType); + } + public boolean equals(final Object other) { if (this == other) { return true; diff --git a/engine/api/src/main/java/io/deephaven/engine/table/ColumnSource.java b/engine/api/src/main/java/io/deephaven/engine/table/ColumnSource.java index 8a5e04795d1..9299fcfca7b 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/ColumnSource.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/ColumnSource.java @@ -150,7 +150,7 @@ default Object exportElement(T tuple, int elementIndex) { * {@code String} data: * *
-     *     ColumnSource<String> colSource = table.getColumnSource("MyString").getParameterized(String.class)
+     *     ColumnSource<String> colSource = table.getColumnSource("MyString").cast(String.class)
      * 
*

* Due to the nature of type erasure, the JVM will still insert an additional cast to {@code TYPE} when elements are @@ -163,11 +163,7 @@ default Object exportElement(T tuple, int elementIndex) { @FinalDefault default ColumnSource cast(Class clazz) { Require.neqNull(clazz, "clazz"); - final Class columnSourceType = getType(); - if (!clazz.isAssignableFrom(columnSourceType)) { - throw new ClassCastException(String.format("Cannot convert column source for type %s to type %s", - columnSourceType.getName(), clazz.getName())); - } + TypeHelper.checkCastTo("ColumnSource", getType(), clazz); // noinspection unchecked return (ColumnSource) this; } @@ -184,7 +180,7 @@ default ColumnSource cast(Class clazz) { * {@code String} data: * *

-     *     ColumnSource<String> colSource = table.getColumnSource("MyString", null).getParameterized(String.class)
+     *     ColumnSource<String> colSource = table.getColumnSource("MyString").cast(String.class, null)
      * 
*

* Due to the nature of type erasure, the JVM will still insert an additional cast to {@code TYPE} when elements are @@ -197,19 +193,10 @@ default ColumnSource cast(Class clazz) { */ @FinalDefault default ColumnSource cast(Class clazz, @Nullable Class componentType) { - final ColumnSource casted = cast(clazz); - final Class columnSourceComponentType = getComponentType(); - if ((componentType == null && columnSourceComponentType == null) || (componentType != null - && columnSourceComponentType != null && componentType.isAssignableFrom(columnSourceComponentType))) { - return casted; - } - final Class columnSourceType = getType(); - throw new ClassCastException(String.format( - "Cannot convert column source componentType for type %s to %s (for %s / %s)", - columnSourceComponentType == null ? null : columnSourceComponentType.getName(), - componentType == null ? null : componentType.getName(), - columnSourceType.getName(), - clazz.getName())); + Require.neqNull(clazz, "clazz"); + TypeHelper.checkCastTo("ColumnSource", getType(), getComponentType(), clazz, componentType); + // noinspection unchecked + return (ColumnSource) this; } /** diff --git a/engine/api/src/main/java/io/deephaven/engine/table/Table.java b/engine/api/src/main/java/io/deephaven/engine/table/Table.java index f872f366076..f06946b2d9c 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/Table.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/Table.java @@ -228,20 +228,32 @@ public interface Table extends * caller expects. This differs from {@link #getColumnSource(String, Class)} which uses the provided {@link Class} * object to verify that the data type is a subclass of the expected class. * + *

+ * The success of this call is equivalent to {@code getDefinition().checkColumn(sourceName)}, which is the preferred + * way to check for compatibility in scenarios where the caller does not want the implementation to potentially + * invoke {@link #coalesce()}. + * * @param sourceName The name of the column * @param The target type, as a type parameter. Inferred from context. * @return The column source for {@code sourceName}, parameterized by {@code T} + * @see TableDefinition#checkHasColumn(String) */ ColumnSource getColumnSource(String sourceName); /** * Retrieves a {@code ColumnSource} and {@link ColumnSource#cast(Class) casts} it to the target class {@code clazz}. * + *

+ * The success of this call is equivalent to {@code getDefinition().checkColumn(sourceName, clazz)}, which is the + * preferred way to check for compatibility in scenarios where the caller does not want to the implementation to + * potentially invoke {@link #coalesce()}. + * * @param sourceName The name of the column * @param clazz The target type * @param The target type, as a type parameter. Intended to be inferred from {@code clazz}. * @return The column source for {@code sourceName}, parameterized by {@code T} * @see ColumnSource#cast(Class) + * @see TableDefinition#checkHasColumn(String, Class) */ ColumnSource getColumnSource(String sourceName, Class clazz); @@ -249,12 +261,18 @@ public interface Table extends * Retrieves a {@code ColumnSource} and {@link ColumnSource#cast(Class, Class)} casts} it to the target class * {@code clazz} and {@code componentType}. * + *

+ * The success of this call is equivalent to {@code getDefinition().checkColumn(sourceName, clazz, componentType)}, + * which is the preferred way to check for compatibility in scenarios where the caller does not want the + * implementation to potentially invoke {@link #coalesce()}. + * * @param sourceName The name of the column * @param clazz The target type * @param componentType The target component type, may be null * @param The target type, as a type parameter. Intended to be inferred from {@code clazz}. * @return The column source for {@code sourceName}, parameterized by {@code T} * @see ColumnSource#cast(Class, Class) + * @see TableDefinition#checkHasColumn(String, Class, Class) */ ColumnSource getColumnSource(String sourceName, Class clazz, @Nullable Class componentType); diff --git a/engine/api/src/main/java/io/deephaven/engine/table/TableDefinition.java b/engine/api/src/main/java/io/deephaven/engine/table/TableDefinition.java index 66aa6b003f7..45fe5a33a37 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/TableDefinition.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/TableDefinition.java @@ -313,6 +313,40 @@ public final void checkHasColumn(@NotNull String columnName) { NoSuchColumnException.throwIf(getColumnNameSet(), columnName); } + /** + * Checks if {@code columnName} exists and supports {@link ColumnDefinition#checkCastTo(Class)} with {@code clazz}. + * Otherwise, throws a {@link NoSuchColumnException} or a {@link ClassCastException}. + * + * @param columnName the column name + * @param clazz the data type + * @see ColumnDefinition#checkCastTo(Class) + */ + public final void checkHasColumn(@NotNull String columnName, @NotNull Class clazz) { + final ColumnDefinition cd = getColumn(columnName); + if (cd == null) { + throw new NoSuchColumnException(getColumnNameSet(), columnName); + } + cd.checkCastTo(clazz); + } + + /** + * Checks if {@code columnName} exists and supports {@link ColumnDefinition#checkCastTo(Class, Class)} with + * {@code clazz} and {@code componentType}. Otherwise, throws a {@link NoSuchColumnException} or a + * {@link ClassCastException}. + * + * @param columnName the column name + * @param clazz the data type + * @param componentType the component type + * @see ColumnDefinition#checkCastTo(Class, Class) + */ + public final void checkHasColumn(@NotNull String columnName, @NotNull Class clazz, Class componentType) { + final ColumnDefinition cd = getColumn(columnName); + if (cd == null) { + throw new NoSuchColumnException(getColumnNameSet(), columnName); + } + cd.checkCastTo(clazz, componentType); + } + /** * Check this definition to ensure that all {@code columns} are present. * diff --git a/engine/api/src/main/java/io/deephaven/engine/table/TypeHelper.java b/engine/api/src/main/java/io/deephaven/engine/table/TypeHelper.java new file mode 100644 index 00000000000..e1029ebd21f --- /dev/null +++ b/engine/api/src/main/java/io/deephaven/engine/table/TypeHelper.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table; + +class TypeHelper { + + // Could be good to move this to io.deephaven.qst.type.Type layer + + public static void checkCastTo(String context, Class srcType, Class destType) { + if (!destType.isAssignableFrom(srcType)) { + throw new ClassCastException(String.format("Cannot convert %s of type %s to type %s", + context, srcType.getName(), destType.getName())); + } + } + + public static void checkCastTo( + String prefix, + Class srcType, + Class srcComponentType, + Class destType, + Class destComponentType) { + checkCastTo(prefix, srcType, destType); + if ((srcComponentType == null && destComponentType == null) || (srcComponentType != null + && destComponentType != null && destComponentType.isAssignableFrom(srcComponentType))) { + return; + } + throw new ClassCastException(String.format( + "Cannot convert %s componentType of type %s to %s (for %s / %s)", + prefix, + srcComponentType == null ? null : srcComponentType.getName(), + destComponentType == null ? null : destComponentType.getName(), + srcType.getName(), + destType.getName())); + } +} diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java index 585ca3ee256..7e6a43d4013 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaPublishOptions.java @@ -4,6 +4,7 @@ package io.deephaven.kafka; import io.deephaven.annotations.BuildableStyle; +import io.deephaven.api.ColumnName; import io.deephaven.engine.table.Table; import io.deephaven.kafka.KafkaTools.Produce; import io.deephaven.kafka.KafkaTools.Produce.KeyOrValueSpec; @@ -11,6 +12,10 @@ import org.immutables.value.Value.Default; import org.immutables.value.Value.Immutable; +import javax.annotation.Nullable; +import java.time.Instant; +import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; /** @@ -34,16 +39,26 @@ public static Builder builder() { public abstract Table table(); /** - * The kafka topic to publish to. + * The default Kafka topic to publish to. When {@code null}, {@link #topicColumn()} must be set. * - * @return the kafka topic + * @return the default Kafka topic + * @see #topicColumn() */ + @Nullable public abstract String topic(); /** - * The kafka configuration properties. + * The default Kafka partition to publish to. * - * @return the kafka configuration + * @return the default Kafka partition + * @see #partitionColumn() + */ + public abstract OptionalInt partition(); + + /** + * The Kafka configuration properties. + * + * @return the Kafka configuration */ public abstract Properties config(); @@ -93,6 +108,38 @@ public boolean publishInitial() { return true; } + /** + * The topic column. When set, uses the the given {@link CharSequence}-compatible column from {@link #table()} as + * the first source for setting the Kafka record topic. When not present, or if the column value is null, + * {@link #topic()} will be used. + * + * @return the topic column name + */ + public abstract Optional topicColumn(); + + /** + * The partition column. When set, uses the the given {@code int} column from {@link #table()} as the first source + * for setting the Kafka record partition. When not present, or if the column value is null, {@link #partition()} + * will be used if present. If a valid partition number is specified, that partition will be used when sending the + * record. Otherwise, Kafka will choose a partition using a hash of the key if the key is present, or will assign a + * partition in a round-robin fashion if the key is not present. + * + * @return the partition column name + */ + public abstract Optional partitionColumn(); + + /** + * The timestamp column. When set, uses the the given {@link Instant} column from {@link #table()} as the first + * source for setting the Kafka record timestamp. When not present, or if the column value is null, the producer + * will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type + * configured for the topic. If the topic is configured to use CreateTime, the timestamp in the producer record will + * be used by the broker. If the topic is configured to use LogAppendTime, the timestamp in the producer record will + * be overwritten by the broker with the broker local time when it appends the message to its log. + * + * @return the timestamp column name + */ + public abstract Optional timestampColumn(); + @Check final void checkNotBothIgnore() { if (Produce.isIgnore(keySpec()) && Produce.isIgnore(valueSpec())) { @@ -114,12 +161,42 @@ final void checkLastBy() { } } + @Check + final void checkTopic() { + if (topic() == null && topicColumn().isEmpty()) { + throw new IllegalArgumentException("Must set topic or topicColumn (or both)"); + } + } + + @Check + final void checkTopicColumn() { + if (topicColumn().isPresent()) { + table().getDefinition().checkHasColumn(topicColumn().get().name(), CharSequence.class); + } + } + + @Check + final void checkPartitionColumn() { + if (partitionColumn().isPresent()) { + table().getDefinition().checkHasColumn(partitionColumn().get().name(), int.class); + } + } + + @Check + final void checkTimestampColumn() { + if (timestampColumn().isPresent()) { + table().getDefinition().checkHasColumn(timestampColumn().get().name(), Instant.class); + } + } + public interface Builder { Builder table(Table table); Builder topic(String topic); + Builder partition(int partition); + Builder config(Properties config); Builder keySpec(KeyOrValueSpec keySpec); @@ -130,6 +207,12 @@ public interface Builder { Builder publishInitial(boolean publishInitial); + Builder topicColumn(ColumnName columnName); + + Builder partitionColumn(ColumnName columnName); + + Builder timestampColumn(ColumnName columnName); + KafkaPublishOptions build(); } } diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 9dab3b5b194..a8fae8e84af 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1442,12 +1442,16 @@ public static Runnable produceFromTable(KafkaPublishOptions options) { options.config(), effectiveTable, options.topic(), + options.partition().isEmpty() ? null : options.partition().getAsInt(), keyColumns, keySpecSerializer, keySerializer, valueColumns, valueSpecSerializer, valueSerializer, + options.topicColumn().orElse(null), + options.partitionColumn().orElse(null), + options.timestampColumn().orElse(null), options.publishInitial()); } return publisherScope::release; diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/PublishToKafka.java b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/PublishToKafka.java index 03d464d5729..c1759855889 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/PublishToKafka.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/PublishToKafka.java @@ -3,19 +3,28 @@ */ package io.deephaven.kafka.publish; +import io.deephaven.api.ColumnName; import io.deephaven.base.verify.Assert; -import io.deephaven.chunk.attributes.Values; +import io.deephaven.chunk.IntChunk; +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.ObjectChunk; import io.deephaven.configuration.Configuration; -import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.liveness.LivenessArtifact; import io.deephaven.engine.liveness.LivenessScope; -import io.deephaven.engine.table.impl.*; -import io.deephaven.chunk.ObjectChunk; import io.deephaven.engine.rowset.RowSequence; import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.table.ChunkSource; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.ModifiedColumnSet; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableUpdate; +import io.deephaven.engine.table.impl.BlinkTableTools; +import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter; +import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.sources.ReinterpretUtils; +import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.kafka.KafkaPublishOptions; +import io.deephaven.util.QueryConstants; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.InternalUseOnly; import io.deephaven.util.annotations.ReferentialIntegrity; @@ -26,8 +35,10 @@ import org.apache.kafka.common.serialization.Serializer; import org.jetbrains.annotations.NotNull; +import java.time.Instant; import java.util.Objects; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -43,13 +54,36 @@ public class PublishToKafka extends LivenessArtifact { private final Table table; private final KafkaProducer producer; - private final String topic; + private final String defaultTopic; + private final Integer defaultPartition; private final KeyOrValueSerializer keyChunkSerializer; private final KeyOrValueSerializer valueChunkSerializer; + private final ColumnSource topicColumnSource; + private final ColumnSource partitionColumnSource; + private final ColumnSource timestampColumnSource; @ReferentialIntegrity private final PublishListener publishListener; + /** + * @deprecated please use {@link io.deephaven.kafka.KafkaTools#produceFromTable(KafkaPublishOptions)} + */ + @Deprecated(forRemoval = true) + public PublishToKafka( + final Properties props, + final Table table, + final String topic, + final String[] keyColumns, + final Serializer kafkaKeySerializer, + final KeyOrValueSerializer keyChunkSerializer, + final String[] valueColumns, + final Serializer kafkaValueSerializer, + final KeyOrValueSerializer valueChunkSerializer, + final boolean publishInitial) { + this(props, table, topic, null, keyColumns, kafkaKeySerializer, keyChunkSerializer, valueColumns, + kafkaValueSerializer, valueChunkSerializer, null, null, null, publishInitial); + } + /** *

* Construct a publisher for {@code table} according the to Kafka {@code props} for the supplied {@code topic}. @@ -77,7 +111,8 @@ public class PublishToKafka extends LivenessArtifact { * * @param props The Kafka {@link Properties} * @param table The source {@link Table} - * @param topic The destination topic + * @param defaultTopic The default destination topic + * @param defaultPartition The default destination partition * @param keyColumns Optional array of string column names from table for the columns corresponding to Kafka's Key * field. * @param kafkaKeySerializer The kafka {@link Serializer} to use for keys @@ -89,26 +124,46 @@ public class PublishToKafka extends LivenessArtifact { * @param valueChunkSerializer Optional {@link KeyOrValueSerializer} to consume table data and produce Kafka record * values in chunk-oriented fashion * @param publishInitial If the initial data in {@code table} should be published + * @param topicColumn The topic column. When set, uses the the given {@link CharSequence} column from {@code table} + * as the first source for setting the kafka record topic. + * @param partitionColumn The partition column. When set, uses the the given {@code int} column from {@code table} + * as the first source for setting the kafka record partition. + * @param timestampColumn The timestamp column. When set, uses the the given {@link Instant} column from + * {@code table} as the first source for setting the kafka record timestamp. */ public PublishToKafka( final Properties props, - final Table table, - final String topic, + Table table, + final String defaultTopic, + final Integer defaultPartition, final String[] keyColumns, final Serializer kafkaKeySerializer, final KeyOrValueSerializer keyChunkSerializer, final String[] valueColumns, final Serializer kafkaValueSerializer, final KeyOrValueSerializer valueChunkSerializer, + final ColumnName topicColumn, + final ColumnName partitionColumn, + final ColumnName timestampColumn, final boolean publishInitial) { - this.table = table; + this.table = (table = table.coalesce()); this.producer = new KafkaProducer<>( props, Objects.requireNonNull(kafkaKeySerializer), Objects.requireNonNull(kafkaValueSerializer)); - this.topic = topic; + this.defaultTopic = defaultTopic; + this.defaultPartition = defaultPartition; this.keyChunkSerializer = keyChunkSerializer; this.valueChunkSerializer = valueChunkSerializer; + this.topicColumnSource = topicColumn == null + ? null + : table.getColumnSource(topicColumn.name(), CharSequence.class); + this.partitionColumnSource = partitionColumn == null + ? null + : table.getColumnSource(partitionColumn.name(), int.class); + this.timestampColumnSource = timestampColumn == null + ? null + : ReinterpretUtils.instantToLongSource(table.getColumnSource(timestampColumn.name(), Instant.class)); if (publishInitial) { // Publish the initial table state try (final PublicationGuard guard = new PublicationGuard()) { @@ -133,6 +188,38 @@ private static ModifiedColumnSet getModifiedColumnSet(@NotNull final Table table : ((QueryTable) table).newModifiedColumnSet(columns); } + private String topic(ObjectChunk topicChunk, int index) { + if (topicChunk == null) { + return defaultTopic; + } + final CharSequence charSequence = topicChunk.get(index); + return charSequence == null ? defaultTopic : charSequence.toString(); + } + + private Integer partition(IntChunk partitionChunk, int index) { + if (partitionChunk == null) { + return defaultPartition; + } + final int partition = partitionChunk.get(index); + return partition == QueryConstants.NULL_INT ? defaultPartition : Integer.valueOf(partition); + } + + public static Long timestampMillis(LongChunk nanosChunk, int index) { + if (nanosChunk == null) { + return null; + } + final long nanos = nanosChunk.get(index); + return nanos == QueryConstants.NULL_LONG ? null : TimeUnit.NANOSECONDS.toMillis(nanos); + } + + private static T object(ObjectChunk chunk, int index) { + return chunk == null ? null : chunk.get(index); + } + + private static ChunkSource.GetContext makeGetContext(ColumnSource source, int chunkSize) { + return source == null ? null : source.makeGetContext(chunkSize); + } + private void publishMessages(@NotNull final RowSet rowsToPublish, final boolean usePrevious, final boolean publishValues, @NotNull final PublicationGuard guard) { if (rowsToPublish.isEmpty()) { @@ -142,31 +229,55 @@ private void publishMessages(@NotNull final RowSet rowsToPublish, final boolean final int chunkSize = (int) Math.min(CHUNK_SIZE, rowsToPublish.size()); try (final RowSequence.Iterator rowsIterator = rowsToPublish.getRowSequenceIterator(); - final KeyOrValueSerializer.Context keyContext = - keyChunkSerializer != null ? keyChunkSerializer.makeContext(chunkSize) : null; - final KeyOrValueSerializer.Context valueContext = - publishValues && valueChunkSerializer != null ? valueChunkSerializer.makeContext(chunkSize) - : null) { + final KeyOrValueSerializer.Context keyContext = keyChunkSerializer != null + ? keyChunkSerializer.makeContext(chunkSize) + : null; + final KeyOrValueSerializer.Context valueContext = publishValues && valueChunkSerializer != null + ? valueChunkSerializer.makeContext(chunkSize) + : null; + final ChunkSource.GetContext topicContext = makeGetContext(topicColumnSource, chunkSize); + final ChunkSource.GetContext partitionContext = makeGetContext(partitionColumnSource, chunkSize); + final ChunkSource.GetContext timestampContext = makeGetContext(timestampColumnSource, chunkSize)) { while (rowsIterator.hasMore()) { final RowSequence chunkRowKeys = rowsIterator.getNextRowSequenceWithLength(chunkSize); - final ObjectChunk keyChunk; - if (keyContext != null) { - keyChunk = keyChunkSerializer.handleChunk(keyContext, chunkRowKeys, usePrevious); - } else { - keyChunk = null; - } + final ObjectChunk keyChunk = keyContext == null + ? null + : keyChunkSerializer.handleChunk(keyContext, chunkRowKeys, usePrevious); - final ObjectChunk valueChunk; - if (valueContext != null) { - valueChunk = valueChunkSerializer.handleChunk(valueContext, chunkRowKeys, usePrevious); - } else { - valueChunk = null; - } + final ObjectChunk valueChunk = valueContext == null + ? null + : valueChunkSerializer.handleChunk(valueContext, chunkRowKeys, usePrevious); + + final ObjectChunk topicChunk = topicContext == null + ? null + : (usePrevious + ? topicColumnSource.getPrevChunk(topicContext, chunkRowKeys) + : topicColumnSource.getChunk(topicContext, chunkRowKeys)) + .asObjectChunk(); + + final IntChunk partitionChunk = partitionContext == null + ? null + : (usePrevious + ? partitionColumnSource.getPrevChunk(partitionContext, chunkRowKeys) + : partitionColumnSource.getChunk(partitionContext, chunkRowKeys)) + .asIntChunk(); + + final LongChunk timestampChunk = timestampContext == null + ? null + : (usePrevious + ? timestampColumnSource.getPrevChunk(timestampContext, chunkRowKeys) + : timestampColumnSource.getChunk(timestampContext, chunkRowKeys)) + .asLongChunk(); - for (int ii = 0; ii < chunkRowKeys.intSize(); ++ii) { - final ProducerRecord record = new ProducerRecord<>(topic, - keyChunk != null ? keyChunk.get(ii) : null, valueChunk != null ? valueChunk.get(ii) : null); + final int numRecords = chunkRowKeys.intSize(); + for (int ii = 0; ii < numRecords; ++ii) { + final ProducerRecord record = new ProducerRecord<>( + topic(topicChunk, ii), + partition(partitionChunk, ii), + timestampMillis(timestampChunk, ii), + object(keyChunk, ii), + object(valueChunk, ii)); producer.send(record, guard); } } diff --git a/extensions/kafka/src/test/java/io/deephaven/kafka/KafkaPublishOptionsTest.java b/extensions/kafka/src/test/java/io/deephaven/kafka/KafkaPublishOptionsTest.java new file mode 100644 index 00000000000..b6d351e7d33 --- /dev/null +++ b/extensions/kafka/src/test/java/io/deephaven/kafka/KafkaPublishOptionsTest.java @@ -0,0 +1,217 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.kafka; + +import io.deephaven.api.ColumnName; +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.NoSuchColumnException; +import io.deephaven.engine.util.TableTools; +import io.deephaven.kafka.KafkaTools.Produce; +import org.junit.Test; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +public class KafkaPublishOptionsTest { + + private static final TableDefinition TD = TableDefinition.of( + ColumnDefinition.ofString("MyTopic"), + ColumnDefinition.ofInt("MyPartition"), + ColumnDefinition.ofTime("MyTimestamp"), + ColumnDefinition.ofString("MyValue")); + + @Test + public void ok() { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .build(); + } + + @Test + public void okPartition() { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .partition(123) + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .build(); + } + + + @Test + public void checkNotBothIgnore() { + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("keySpec and valueSpec can't both be ignore specs"); + } + } + + @Test + public void checkPublishInitial() { + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .publishInitial(false) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("publishInitial==false && table.isRefreshing() == false"); + } + } + + @Test + public void checkLastBy() { + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .lastBy(true) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("Must set a non-ignore keySpec when lastBy() == true"); + } + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .keySpec(Produce.simpleSpec("MyValue")) + .lastBy(true) + .build(); + } + + @Test + public void checkTopic() { + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .build(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("Must set topic or topicColumn (or both)"); + } + } + + @Test + public void checkTopicColumn() { + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .topicColumn(ColumnName.of("DoesNotExist")) + .build(); + failBecauseExceptionWasNotThrown(NoSuchColumnException.class); + } catch (NoSuchColumnException e) { + assertThat(e).hasMessageContaining("Unknown column names [DoesNotExist]"); + } + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .topicColumn(ColumnName.of("MyPartition")) + .build(); + failBecauseExceptionWasNotThrown(ClassCastException.class); + } catch (ClassCastException e) { + assertThat(e).hasMessage("Cannot convert [MyPartition] of type int to type java.lang.CharSequence"); + } + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .topicColumn(ColumnName.of("MyTopic")) + .build(); + } + + @Test + public void checkPartitionColumn() { + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .partitionColumn(ColumnName.of("DoesNotExist")) + .build(); + failBecauseExceptionWasNotThrown(NoSuchColumnException.class); + } catch (NoSuchColumnException e) { + assertThat(e).hasMessageContaining("Unknown column names [DoesNotExist]"); + } + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .partitionColumn(ColumnName.of("MyTopic")) + .build(); + failBecauseExceptionWasNotThrown(ClassCastException.class); + } catch (ClassCastException e) { + assertThat(e).hasMessage("Cannot convert [MyTopic] of type java.lang.String to type int"); + } + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .partitionColumn(ColumnName.of("MyPartition")) + .build(); + } + + @Test + public void checkTimestampColumn() { + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .timestampColumn(ColumnName.of("DoesNotExist")) + .build(); + failBecauseExceptionWasNotThrown(NoSuchColumnException.class); + } catch (NoSuchColumnException e) { + assertThat(e).hasMessageContaining("Unknown column names [DoesNotExist]"); + } + try { + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .timestampColumn(ColumnName.of("MyTopic")) + .build(); + failBecauseExceptionWasNotThrown(ClassCastException.class); + } catch (ClassCastException e) { + assertThat(e).hasMessage("Cannot convert [MyTopic] of type java.lang.String to type java.time.Instant"); + } + KafkaPublishOptions.builder() + .table(TableTools.newTable(TD)) + .topic("HotTopic") + .config(new Properties()) + .valueSpec(Produce.simpleSpec("MyValue")) + .timestampColumn(ColumnName.of("MyTimestamp")) + .build(); + } +} diff --git a/py/server/deephaven/stream/kafka/producer.py b/py/server/deephaven/stream/kafka/producer.py index a9e65aae9a1..0b814ee8e8d 100644 --- a/py/server/deephaven/stream/kafka/producer.py +++ b/py/server/deephaven/stream/kafka/producer.py @@ -3,7 +3,7 @@ # """ The kafka.producer module supports publishing Deephaven tables to Kafka streams. """ -from typing import Dict, Callable, List +from typing import Dict, Callable, List, Optional import jpy @@ -17,6 +17,7 @@ _JAvroSchema = jpy.get_type("org.apache.avro.Schema") _JKafkaTools_Produce = jpy.get_type("io.deephaven.kafka.KafkaTools$Produce") _JKafkaPublishOptions = jpy.get_type("io.deephaven.kafka.KafkaPublishOptions") +_JColumnName = jpy.get_type("io.deephaven.api.ColumnName") class KeyValueSpec(JObjectWrapper): @@ -36,20 +37,24 @@ def j_object(self) -> jpy.JType: def produce( table: Table, kafka_config: Dict, - topic: str, + topic: Optional[str], key_spec: KeyValueSpec, value_spec: KeyValueSpec, last_by_key_columns: bool = False, publish_initial: bool = True, + partition: Optional[int] = None, + topic_col: Optional[str] = None, + partition_col: Optional[str] = None, + timestamp_col: Optional[str] = None, ) -> Callable[[], None]: """Produce to Kafka from a Deephaven table. Args: table (Table): the source table to publish to Kafka - kafka_config (Dict): configuration for the associated kafka producer. + kafka_config (Dict): configuration for the associated Kafka producer. This is used to call the constructor of org.apache.kafka.clients.producer.KafkaProducer; pass any KafkaProducer specific desired configuration here - topic (str): the topic name + topic (Optional[str]): the default topic name. When None, topic_col must be set. See topic_col for behavior. key_spec (KeyValueSpec): specifies how to map table column(s) to the Key field in produced Kafka messages. This should be the result of calling one of the functions simple_spec(), avro_spec() or json_spec() in this module, or the constant KeyValueSpec.IGNORE @@ -61,6 +66,22 @@ def produce( aggregation on table grouped by the input columns of key_spec and publish to Kafka from the result. publish_initial (bool): whether the initial data in table should be published. When False, table.is_refreshing must be True. By default, is True. + partition (Optional[int]): the default partition, None by default. See partition_col for partition behavior. + topic_col (Optional[str]): the topic column, None by default. When set, uses the the given string column from + table as the first source for setting the Kafka record topic. When None, or if the column value is null, topic + will be used. + partition_col (Optional[str]): the partition column, None by default. When set, uses the the given int column + from table as the first source for setting the Kafka record partition. When None, or if the column value is null, + partition will be used if present. If a valid partition number is specified, that partition will be used + when sending the record. Otherwise, Kafka will choose a partition using a hash of the key if the key is present, + or will assign a partition in a round-robin fashion if the key is not present. + timestamp_col (Optional[str]): the timestamp column, None by default. When set, uses the the given timestamp + column from table as the first source for setting the Kafka record timestamp. When None, or if the column value + is null, the producer will stamp the record with its current time. The timestamp eventually used by Kafka + depends on the timestamp type configured for the topic. If the topic is configured to use CreateTime, the + timestamp in the producer record will be used by the broker. If the topic is configured to use LogAppendTime, + the timestamp in the producer record will be overwritten by the broker with the broker local time when it + appends the message to its log. Returns: a callback that, when invoked, stops publishing and cleans up subscriptions and resources. @@ -77,20 +98,28 @@ def produce( ) if not publish_initial and not table.is_refreshing: raise ValueError("publish_initial == False and table.is_refreshing == False") - options = ( + options_builder = ( _JKafkaPublishOptions.builder() .table(table.j_table) - .topic(topic) .config(j_properties(kafka_config)) .keySpec(key_spec.j_object) .valueSpec(value_spec.j_object) .lastBy(last_by_key_columns and key_spec is not KeyValueSpec.IGNORE) .publishInitial(publish_initial) - .build() ) + if topic: + options_builder.topic(topic) + if partition: + options_builder.partition(partition) + if topic_col: + options_builder.topicColumn(_JColumnName.of(topic_col)) + if partition_col: + options_builder.partitionColumn(_JColumnName.of(partition_col)) + if timestamp_col: + options_builder.timestampColumn(_JColumnName.of(timestamp_col)) with auto_locking_ctx(table): - runnable = _JKafkaTools.produceFromTable(options) + runnable = _JKafkaTools.produceFromTable(options_builder.build()) def cleanup(): try: diff --git a/py/server/tests/test_kafka_producer.py b/py/server/tests/test_kafka_producer.py index 9b806d702db..d71b8ca8960 100644 --- a/py/server/tests/test_kafka_producer.py +++ b/py/server/tests/test_kafka_producer.py @@ -4,9 +4,10 @@ import os import unittest +from datetime import datetime from deephaven import kafka_producer as pk, new_table, time_table -from deephaven.column import string_col, int_col, double_col +from deephaven.column import string_col, int_col, double_col, datetime_col from deephaven.stream import kafka from deephaven.stream.kafka.producer import KeyValueSpec from tests.testbase import BaseTestCase @@ -50,6 +51,126 @@ def test_simple_spec(self): self.assertIsNotNone(cleanup) cleanup() + def test_simple_spec_topic_col_no_default_topic(self): + """ + Check a simple Kafka producer works with a topic column but no default topic + """ + t = new_table(cols=[ + string_col('Topic', ['orders_a', 'orders_b', 'orders_a', 'orders_b']), + double_col('Price', [10.0, 10.5, 11.0, 11.5]) + ]) + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + None, + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.simple_spec('Price'), + topic_col='Topic' + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_simple_spec_topic_col_default_topic(self): + """ + Check a simple Kafka producer works with a topic column and a default topic + """ + t = new_table(cols=[ + string_col('Topic', ['orders_a', None, 'orders_a', 'orders_b']), + double_col('Price', [10.0, 10.5, 11.0, 11.5]) + ]) + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.simple_spec('Price'), + topic_col='Topic' + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_simple_spec_default_partition(self): + """ + Check a simple Kafka producer works with a default partition + """ + t = new_table(cols=[ + double_col('Price', [10.0, 10.5, 11.0, 11.5])] + ) + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + "orders", + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.simple_spec('Price'), + partition=0 + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_simple_spec_partition_col_no_default_partition(self): + """ + Check a simple Kafka producer works with a partition column + """ + t = new_table(cols=[ + int_col('Partition', [0, 0, 0, 0]), + double_col('Price', [10.0, 10.5, 11.0, 11.5]) + ]) + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + "orders", + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.simple_spec('Price'), + partition_col='Partition' + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_simple_spec_partition_col_default_partition(self): + """ + Check a simple Kafka producer works with a partition column and default partition + """ + t = new_table(cols=[ + int_col('Partition', [0, 0, None, 0]), + double_col('Price', [10.0, 10.5, 11.0, 11.5]) + ]) + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + "orders", + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.simple_spec('Price'), + partition=0, + partition_col='Partition' + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_simple_spec_timestamp_col(self): + """ + Check a simple Kafka producer works with a timestamp column + """ + t = new_table(cols=[ + datetime_col('Timestamp', [datetime.now(), datetime.now(), None, datetime.now()]), + double_col('Price', [10.0, 10.5, 11.0, 11.5]) + ]) + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + "orders", + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.simple_spec('Price'), + timestamp_col='Timestamp' + ) + + self.assertIsNotNone(cleanup) + cleanup() + def test_json_spec_only_columns(self): t = table_helper() cleanup = pk.produce(