Skip to content

Commit

Permalink
Move stuff to stream publisher, use spec for mongo.
Browse files Browse the repository at this point in the history
  • Loading branch information
cpwright committed Dec 22, 2023
1 parent a9a0a6f commit d50fb07
Show file tree
Hide file tree
Showing 49 changed files with 390 additions and 1,520 deletions.
16 changes: 8 additions & 8 deletions extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.kafka.KafkaTools.Consume;
import io.deephaven.kafka.KafkaTools.KeyOrValue;
import io.deephaven.kafka.KafkaTools.KeyOrValueIngestData;
import io.deephaven.kafka.KafkaTools.Produce;
import io.deephaven.kafka.ingest.GenericRecordChunkAdapter;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.streampublisher.KeyOrValueProcessor;
import io.deephaven.kafka.publish.GenericRecordKeyOrValueSerializer;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.qst.type.Type;
import io.deephaven.stream.StreamChunkUtils;
import io.deephaven.streampublisher.KeyOrValueIngestData;
import io.deephaven.vector.ByteVector;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand Down Expand Up @@ -62,7 +62,7 @@ class AvroImpl {

private static final Type<Utf8> utf8Type = Type.find(Utf8.class);

static final class AvroConsume extends Consume.KeyOrValueSpec {
static final class AvroConsume implements Consume.KeyOrValueSpec {
private static final Pattern NESTED_FIELD_NAME_SEPARATOR_PATTERN =
Pattern.compile(Pattern.quote(NESTED_FIELD_NAME_SEPARATOR));

Expand Down Expand Up @@ -105,15 +105,15 @@ public Optional<SchemaProvider> getSchemaProvider() {
}

@Override
protected Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
public Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs) {
return new KafkaAvroDeserializer(Objects.requireNonNull(schemaRegistryClient));
}

@Override
protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
public KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
KeyOrValueIngestData data = new KeyOrValueIngestData();
data.fieldPathToColumnName = new HashMap<>();
final Schema localSchema = schema != null
Expand All @@ -126,7 +126,7 @@ protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
}

@Override
protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
public KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
return GenericRecordChunkAdapter.make(
tableDef,
ci -> StreamChunkUtils.chunkTypeForColumnIndex(tableDef, ci),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.kafka.KafkaTools.Consume;
import io.deephaven.kafka.KafkaTools.KeyOrValue;
import io.deephaven.kafka.KafkaTools.KeyOrValueIngestData;
import io.deephaven.kafka.KafkaTools.Produce;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.streampublisher.KeyOrValueProcessor;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.streampublisher.KeyOrValueIngestData;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
Expand All @@ -26,28 +26,28 @@
import java.util.Optional;

class IgnoreImpl {
static final class IgnoreConsume extends Consume.KeyOrValueSpec {
static final class IgnoreConsume implements Consume.KeyOrValueSpec {

@Override
public Optional<SchemaProvider> getSchemaProvider() {
return Optional.empty();
}

@Override
protected Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
public Deserializer<?> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs) {
return new ByteArrayDeserializer();
}

@Override
protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
public KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue,
SchemaRegistryClient schemaRegistryClient, Map<String, ?> configs, MutableInt nextColumnIndexMut,
List<ColumnDefinition<?>> columnDefinitionsOut) {
return null;
}

@Override
protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
public KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
return null;
}
}
Expand Down
236 changes: 0 additions & 236 deletions extensions/kafka/src/main/java/io/deephaven/kafka/JsonImpl.java

This file was deleted.

Loading

0 comments on commit d50fb07

Please sign in to comment.