diff --git a/engine/api/src/main/java/io/deephaven/engine/table/Table.java b/engine/api/src/main/java/io/deephaven/engine/table/Table.java index 6d606f464ac..02320d3b8e4 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/Table.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/Table.java @@ -219,6 +219,8 @@ public interface Table extends String BARRAGE_PERFORMANCE_KEY_ATTRIBUTE = "BarragePerformanceTableKey"; /** * Set an Apache Arrow POJO Schema to this attribute to control the column encoding used for barrage serialization. + *

+ * See {@code org.apache.arrow.vector.types.pojo.Schema}. */ String BARRAGE_SCHEMA_ATTRIBUTE = "BarrageSchema"; diff --git a/extensions/barrage/BarrageTypeMapping.md b/extensions/barrage/BarrageTypeMapping.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java index b9c1584a0b9..6405d8689d4 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkReader.java @@ -23,9 +23,9 @@ * data, supporting various data types and logical structures. This interface is part of the Deephaven Barrage * extensions for handling streamed data ingestion. * - * @param The type of chunk being read, extending {@link WritableChunk} with {@link Values}. + * @param The type of chunk being read, extending {@link WritableChunk} with {@link Values}. */ -public interface ChunkReader> { +public interface ChunkReader> { /** * Supports creation of {@link ChunkReader} instances to use when processing a flight stream. JVM implementations @@ -55,7 +55,7 @@ > ChunkReader newReader( * @throws IOException if an error occurred while reading the stream */ @FinalDefault - default ReadChunkType readChunk( + default READ_CHUNK_TYPE readChunk( @NotNull Iterator fieldNodeIter, @NotNull PrimitiveIterator.OfLong bufferInfoIter, @NotNull DataInput is) throws IOException { @@ -74,7 +74,7 @@ default ReadChunkType readChunk( * @return a Chunk containing the data from the stream * @throws IOException if an error occurred while reading the stream */ - ReadChunkType readChunk( + READ_CHUNK_TYPE readChunk( @NotNull Iterator fieldNodeIter, @NotNull PrimitiveIterator.OfLong bufferInfoIter, @NotNull DataInput is, diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java index 3c52e581489..14c51a74c4d 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReaderFactory.java @@ -648,6 +648,9 @@ private static ChunkReader> decimalToBig } BigInteger unscaledValue = new BigInteger(value); + if (scale == 0) { + return unscaledValue; + } return unscaledValue.divide(BigInteger.TEN.pow(scale)); }); } @@ -702,17 +705,22 @@ private static ChunkReader> intToByte( final BarrageOptions options) { final ArrowType.Int intType = (ArrowType.Int) arrowType; final int bitWidth = intType.getBitWidth(); + final boolean unsigned = !intType.getIsSigned(); switch (bitWidth) { case 8: - // note unsigned mappings to byte will overflow byte; but user has asked for this + // note unsigned mappings to byte will overflow; but user has asked for this return new ByteChunkReader(options); case 16: - // note shorts may overflow byte; but user has asked for this + if (unsigned) { + // note shorts may overflow; but user has asked for this + return ByteChunkReader.transformTo(new CharChunkReader(options), + (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); + } return ByteChunkReader.transformTo(new ShortChunkReader(options), (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); case 32: - // note ints may overflow byte; but user has asked for this + // note ints may overflow; but user has asked for this return ByteChunkReader.transformTo(new IntChunkReader(options), (chunk, ii) -> QueryLanguageFunctionUtils.byteCast(chunk.get(ii))); case 64: @@ -735,17 +743,19 @@ private static ChunkReader> intToShort( switch (bitWidth) { case 8: return ShortChunkReader.transformTo(new ByteChunkReader(options), - (chunk, ii) -> maskIfOverflow(unsigned, - Byte.BYTES, QueryLanguageFunctionUtils.shortCast(chunk.get(ii)))); + (chunk, ii) -> maskIfOverflow(unsigned, QueryLanguageFunctionUtils.shortCast(chunk.get(ii)))); case 16: - // note unsigned mappings to short will overflow short; but user has asked for this + if (unsigned) { + return ShortChunkReader.transformTo(new CharChunkReader(options), + (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); + } return new ShortChunkReader(options); case 32: - // note ints may overflow short; but user has asked for this + // note ints may overflow; but user has asked for this return ShortChunkReader.transformTo(new IntChunkReader(options), (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); case 64: - // note longs may overflow short; but user has asked for this + // note longs may overflow; but user has asked for this return ShortChunkReader.transformTo(new LongChunkReader(options), (chunk, ii) -> QueryLanguageFunctionUtils.shortCast(chunk.get(ii))); default: @@ -767,6 +777,10 @@ private static ChunkReader> intToInt( (chunk, ii) -> maskIfOverflow(unsigned, Byte.BYTES, QueryLanguageFunctionUtils.intCast(chunk.get(ii)))); case 16: + if (unsigned) { + return IntChunkReader.transformTo(new CharChunkReader(options), + (chunk, ii) -> QueryLanguageFunctionUtils.intCast(chunk.get(ii))); + } return IntChunkReader.transformTo(new ShortChunkReader(options), (chunk, ii) -> maskIfOverflow(unsigned, Short.BYTES, QueryLanguageFunctionUtils.intCast(chunk.get(ii)))); case 32: @@ -795,6 +809,10 @@ private static ChunkReader> intToLong( (chunk, ii) -> maskIfOverflow(unsigned, Byte.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii)))); case 16: + if (unsigned) { + return LongChunkReader.transformTo(new CharChunkReader(options), + (chunk, ii) -> QueryLanguageFunctionUtils.longCast(chunk.get(ii))); + } return LongChunkReader.transformTo(new ShortChunkReader(options), (chunk, ii) -> maskIfOverflow(unsigned, Short.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii)))); @@ -822,6 +840,10 @@ private static ChunkReader> intToBigInt( return transformToObject(new ByteChunkReader(options), (chunk, ii) -> toBigInt(maskIfOverflow( unsigned, Byte.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii))))); case 16: + if (unsigned) { + return transformToObject(new CharChunkReader(options), + (chunk, ii) -> toBigInt(QueryLanguageFunctionUtils.longCast(chunk.get(ii)))); + } return transformToObject(new ShortChunkReader(options), (chunk, ii) -> toBigInt(maskIfOverflow( unsigned, Short.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii))))); case 32: @@ -848,6 +870,10 @@ private static ChunkReader> intToFloat( return FloatChunkReader.transformTo(new ByteChunkReader(options), (chunk, ii) -> floatCast(Byte.BYTES, signed, chunk.isNull(ii), chunk.get(ii))); case 16: + if (!signed) { + return FloatChunkReader.transformTo(new CharChunkReader(options), + (chunk, ii) -> floatCast(Character.BYTES, signed, chunk.isNull(ii), chunk.get(ii))); + } return FloatChunkReader.transformTo(new ShortChunkReader(options), (chunk, ii) -> floatCast(Short.BYTES, signed, chunk.isNull(ii), chunk.get(ii))); case 32: @@ -898,6 +924,10 @@ private static ChunkReader> intToDouble( return DoubleChunkReader.transformTo(new ByteChunkReader(options), (chunk, ii) -> doubleCast(Byte.BYTES, signed, chunk.isNull(ii), chunk.get(ii))); case 16: + if (!signed) { + return DoubleChunkReader.transformTo(new CharChunkReader(options), + (chunk, ii) -> doubleCast(Character.BYTES, signed, chunk.isNull(ii), chunk.get(ii))); + } return DoubleChunkReader.transformTo(new ShortChunkReader(options), (chunk, ii) -> doubleCast(Short.BYTES, signed, chunk.isNull(ii), chunk.get(ii))); case 32: @@ -948,6 +978,10 @@ private static ChunkReader> intToBigDeci return transformToObject(new ByteChunkReader(options), (chunk, ii) -> toBigDecimal(maskIfOverflow( unsigned, Byte.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii))))); case 16: + if (unsigned) { + return transformToObject(new CharChunkReader(options), (chunk, ii) -> toBigDecimal(maskIfOverflow( + unsigned, Character.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii))))); + } return transformToObject(new ShortChunkReader(options), (chunk, ii) -> toBigDecimal(maskIfOverflow( unsigned, Short.BYTES, QueryLanguageFunctionUtils.longCast(chunk.get(ii))))); case 32: @@ -983,11 +1017,11 @@ private static ChunkReader> intToChar( (chunk, ii) -> QueryLanguageFunctionUtils.charCast(chunk.get(ii))); } case 32: - // note unsigned mappings to char will overflow short; but user has asked for this + // note int mappings to char will overflow; but user has asked for this return CharChunkReader.transformTo(new IntChunkReader(options), (chunk, ii) -> QueryLanguageFunctionUtils.charCast(chunk.get(ii))); case 64: - // note unsigned mappings to short will overflow short; but user has asked for this + // note long mappings to short will overflow; but user has asked for this return CharChunkReader.transformTo(new LongChunkReader(options), (chunk, ii) -> QueryLanguageFunctionUtils.charCast(chunk.get(ii))); default: @@ -1248,16 +1282,17 @@ private static BigDecimal toBigDecimal(final long value) { *

* Special handling is included to preserve the value of null-equivalent constants and to skip masking for signed * values. + *

+ * Note that short can only be sign extended from byte so we don't need to consider other numByte configurations. * * @param unsigned Whether the value should be treated as unsigned. - * @param numBytes The number of bytes to constrain the value to (e.g., 1 for byte, 2 for short). * @param value The input value to potentially mask. * @return The masked value if unsigned and overflow occurs; otherwise, the original value. */ @SuppressWarnings("SameParameterValue") - private static short maskIfOverflow(final boolean unsigned, final int numBytes, short value) { + private static short maskIfOverflow(final boolean unsigned, short value) { if (unsigned && value != QueryConstants.NULL_SHORT) { - value &= (short) ((1L << (numBytes * 8)) - 1); + value &= (short) ((1L << 8) - 1); } return value; } @@ -1332,13 +1367,13 @@ private static BigInteger maskIfOverflow(final boolean unsigned, final int numBy return value; } - private interface ToObjectTransformFunction> { - T get(WireChunkType wireValues, int wireOffset); + private interface ToObjectTransformFunction> { + T get(WIRE_CHUNK_TYPE wireValues, int wireOffset); } - private static , CR extends ChunkReader> ChunkReader> transformToObject( + private static , CR extends ChunkReader> ChunkReader> transformToObject( final CR wireReader, - final ToObjectTransformFunction wireTransform) { + final ToObjectTransformFunction wireTransform) { return new TransformingChunkReader<>( wireReader, WritableObjectChunk::makeWritableChunk, diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java index 24f08c37e83..f3cf9ea748b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkWriterFactory.java @@ -15,6 +15,7 @@ import io.deephaven.chunk.ObjectChunk; import io.deephaven.chunk.ShortChunk; import io.deephaven.chunk.WritableByteChunk; +import io.deephaven.chunk.WritableCharChunk; import io.deephaven.chunk.WritableDoubleChunk; import io.deephaven.chunk.WritableFloatChunk; import io.deephaven.chunk.WritableIntChunk; @@ -806,6 +807,15 @@ private static ChunkWriter> intFromByte( case 8: return ByteChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); case 16: + if (!intType.getIsSigned()) { + return new CharChunkWriter<>((ByteChunk source) -> { + final WritableCharChunk chunk = WritableCharChunk.makeWritableChunk(source.size()); + for (int ii = 0; ii < source.size(); ++ii) { + chunk.set(ii, QueryLanguageFunctionUtils.charCast(source.get(ii))); + } + return chunk; + }, ByteChunk::getEmptyChunk, typeInfo.arrowField().isNullable()); + } return new ShortChunkWriter<>((ByteChunk source) -> { final WritableShortChunk chunk = WritableShortChunk.makeWritableChunk(source.size()); for (int ii = 0; ii < source.size(); ++ii) { @@ -849,6 +859,15 @@ private static ChunkWriter> intFromShort( return chunk; }, ShortChunk::getEmptyChunk, typeInfo.arrowField().isNullable()); case 16: + if (!intType.getIsSigned()) { + return new CharChunkWriter<>((ShortChunk source) -> { + final WritableCharChunk chunk = WritableCharChunk.makeWritableChunk(source.size()); + for (int ii = 0; ii < source.size(); ++ii) { + chunk.set(ii, QueryLanguageFunctionUtils.charCast(source.get(ii))); + } + return chunk; + }, ShortChunk::getEmptyChunk, typeInfo.arrowField().isNullable()); + } return ShortChunkWriter.getIdentity(typeInfo.arrowField().isNullable()); case 32: return new IntChunkWriter<>((ShortChunk source) -> { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java index d91a85a88c2..15a4956cd8b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java @@ -28,13 +28,13 @@ public class DoubleChunkReader extends BaseChunkReader> { private static final String DEBUG_NAME = "DoubleChunkReader"; - public interface ToDoubleTransformFunction> { - double get(WireChunkType wireValues, int wireOffset); + public interface ToDoubleTransformFunction> { + double get(WIRE_CHUNK_TYPE wireValues, int wireOffset); } - public static , T extends ChunkReader> ChunkReader> transformTo( + public static , T extends ChunkReader> ChunkReader> transformTo( final T wireReader, - final ToDoubleTransformFunction wireTransform) { + final ToDoubleTransformFunction wireTransform) { return new TransformingChunkReader<>( wireReader, WritableDoubleChunk::makeWritableChunk, diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java index 5008c2258ee..54a46fe0e37 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java @@ -24,13 +24,13 @@ public class FloatChunkReader extends BaseChunkReader> { private static final String DEBUG_NAME = "FloatChunkReader"; - public interface ToFloatTransformFunction> { - float get(WireChunkType wireValues, int wireOffset); + public interface ToFloatTransformFunction> { + float get(WIRE_CHUNK_TYPE wireValues, int wireOffset); } - public static , T extends ChunkReader> ChunkReader> transformTo( + public static , T extends ChunkReader> ChunkReader> transformTo( final T wireReader, - final ToFloatTransformFunction wireTransform) { + final ToFloatTransformFunction wireTransform) { return new TransformingChunkReader<>( wireReader, WritableFloatChunk::makeWritableChunk, diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/NullChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/NullChunkReader.java index a3b45dc837b..9b8832f7f03 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/NullChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/NullChunkReader.java @@ -14,7 +14,7 @@ import java.util.Iterator; import java.util.PrimitiveIterator; -public class NullChunkReader> extends BaseChunkReader { +public class NullChunkReader> extends BaseChunkReader { private final ChunkType resultType; @@ -23,7 +23,7 @@ public NullChunkReader(Class destType) { } @Override - public ReadChunkType readChunk( + public READ_CHUNK_TYPE readChunk( @NotNull final Iterator fieldNodeIter, @NotNull final PrimitiveIterator.OfLong bufferInfoIter, @NotNull final DataInput is, @@ -42,6 +42,6 @@ public ReadChunkType readChunk( chunk.fillWithNullValue(0, nodeInfo.numElements); // noinspection unchecked - return (ReadChunkType) chunk; + return (READ_CHUNK_TYPE) chunk; } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java index 6689e9e45ac..5aacd5e1fd7 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/TransformingChunkReader.java @@ -19,26 +19,26 @@ /** * A {@link ChunkReader} that reads a chunk of wire values and transforms them into a different chunk type. * - * @param the input chunk type - * @param the output chunk type + * @param the input chunk type + * @param the output chunk type */ -public class TransformingChunkReader, OutputChunkType extends WritableChunk> - extends BaseChunkReader { +public class TransformingChunkReader, OUTPUT_CHUNK_TYPE extends WritableChunk> + extends BaseChunkReader { - public interface TransformFunction, OutputChunkType extends WritableChunk> { - void apply(InputChunkType wireValues, OutputChunkType outChunk, int wireOffset, int outOffset); + public interface TransformFunction, OUTPUT_CHUNK_TYPE extends WritableChunk> { + void apply(INPUT_CHUNK_TYPE wireValues, OUTPUT_CHUNK_TYPE outChunk, int wireOffset, int outOffset); } - private final ChunkReader wireChunkReader; - private final IntFunction chunkFactory; - private final Function, OutputChunkType> castFunction; - private final TransformFunction transformFunction; + private final ChunkReader wireChunkReader; + private final IntFunction chunkFactory; + private final Function, OUTPUT_CHUNK_TYPE> castFunction; + private final TransformFunction transformFunction; public TransformingChunkReader( - @NotNull final ChunkReader wireChunkReader, - final IntFunction chunkFactory, - final Function, OutputChunkType> castFunction, - final TransformFunction transformFunction) { + @NotNull final ChunkReader wireChunkReader, + final IntFunction chunkFactory, + final Function, OUTPUT_CHUNK_TYPE> castFunction, + final TransformFunction transformFunction) { this.wireChunkReader = wireChunkReader; this.chunkFactory = chunkFactory; this.castFunction = castFunction; @@ -46,15 +46,15 @@ public TransformingChunkReader( } @Override - public OutputChunkType readChunk( + public OUTPUT_CHUNK_TYPE readChunk( @NotNull final Iterator fieldNodeIter, @NotNull final PrimitiveIterator.OfLong bufferInfoIter, @NotNull final DataInput is, @Nullable final WritableChunk outChunk, final int outOffset, final int totalRows) throws IOException { - try (final InputChunkType wireValues = wireChunkReader.readChunk(fieldNodeIter, bufferInfoIter, is)) { - final OutputChunkType chunk = castOrCreateChunk( + try (final INPUT_CHUNK_TYPE wireValues = wireChunkReader.readChunk(fieldNodeIter, bufferInfoIter, is)) { + final OUTPUT_CHUNK_TYPE chunk = castOrCreateChunk( outChunk, Math.max(totalRows, wireValues.size()), chunkFactory, castFunction); if (outChunk == null) { // if we're not given an output chunk then we better be writing at the front of the new one diff --git a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/chunk/BarrageColumnRoundTripTest.java b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/chunk/BarrageColumnRoundTripTest.java index 078679c5d21..9c487d6e7b8 100644 --- a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/chunk/BarrageColumnRoundTripTest.java +++ b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/chunk/BarrageColumnRoundTripTest.java @@ -231,7 +231,8 @@ public void testLongChunkSerialization() throws IOException { } } - private static void longIdentityValidator(WritableChunk utO, WritableChunk utC, RowSequence subset, int offset) { + private static void longIdentityValidator(WritableChunk utO, WritableChunk utC, RowSequence subset, + int offset) { final WritableLongChunk original = utO.asWritableLongChunk(); final WritableLongChunk computed = utC.asWritableLongChunk(); if (subset == null) { diff --git a/go/pkg/client/example_import_table_test.go b/go/pkg/client/example_import_table_test.go index a1310e13e57..d0ee1c27629 100644 --- a/go/pkg/client/example_import_table_test.go +++ b/go/pkg/client/example_import_table_test.go @@ -93,7 +93,7 @@ func Example_importTable() { // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "float"] // - Volume: type=int32, nullable // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "int"] - // metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortedColumns": "java.lang.String"] + // metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortedColumns": "java.lang.String", "deephaven:unsent.attribute.BarrageSchema": ""] // rows: 5 // col[0][Ticker]: ["IBM" "XRX" "XYZZY" "GME" "ZNGA"] // col[1][Close]: [38.7 53.8 88.5 453 544.9] diff --git a/go/pkg/client/example_table_ops_test.go b/go/pkg/client/example_table_ops_test.go index 2c8e22d02df..00a55e8efb7 100644 --- a/go/pkg/client/example_table_ops_test.go +++ b/go/pkg/client/example_table_ops_test.go @@ -34,7 +34,7 @@ func Example_tableOps() { fmt.Println(queryResult) - // Output: + // Output: // Data Before: // record: // schema: @@ -47,7 +47,7 @@ func Example_tableOps() { // col[1][Close]: [53.8 88.5 38.7 453 26.7 544.9 13.4] // col[2][Volume]: [87000 6060842 138000 138000000 19000 48300 1500] // - // Data After: + // New data: // record: // schema: // fields: 3 @@ -57,39 +57,28 @@ func Example_tableOps() { // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "float"] // - Volume: type=int32, nullable // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "int"] - // metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortedColumns": "java.lang.String", "deephaven:unsent.attribute.BarrageSchema": ""] + // metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:unsent.attribute.BarrageSchema": ""] // rows: 5 - // col[0][Ticker]: ["IBM" "XRX" "XYZZY" "GME" "ZNGA"] - // col[1][Close]: [38.7 53.8 88.5 453 544.9] - // col[2][Volume]: [138000 87000 6060842 138000000 48300] - // want: - // Data Before: - // record: - // schema: - // fields: 3 - // - Ticker: type=utf8, nullable - // - Close: type=float32, nullable - // - Volume: type=int32, nullable - // rows: 7 - // col[0][Ticker]: ["XRX" "XYZZY" "IBM" "GME" "AAPL" "ZNGA" "T"] - // col[1][Close]: [53.8 88.5 38.7 453 26.7 544.9 13.4] - // col[2][Volume]: [87000 6060842 138000 138000000 19000 48300 1500] + // col[0][Ticker]: ["XRX" "IBM" "GME" "AAPL" "ZNGA"] + // col[1][Close]: [53.8 38.7 453 26.7 544.9] + // col[2][Volume]: [87000 138000 138000000 19000 48300] // - // Data After: // record: // schema: - // fields: 3 + // fields: 4 // - Ticker: type=utf8, nullable // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "java.lang.String"] // - Close: type=float32, nullable // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "float"] // - Volume: type=int32, nullable // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "int"] - // metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortedColumns": "java.lang.String"] + // - Magnitude: type=int32, nullable + // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "int"] // rows: 5 - // col[0][Ticker]: ["IBM" "XRX" "XYZZY" "GME" "ZNGA"] - // col[1][Close]: [38.7 53.8 88.5 453 544.9] - // col[2][Volume]: [138000 87000 6060842 138000000 48300] + // col[0][Ticker]: ["XRX" "IBM" "GME" "AAPL" "ZNGA"] + // col[1][Close]: [53.8 38.7 453 26.7 544.9] + // col[2][Volume]: [87000 138000 138000000 19000 48300] + // col[3][Magnitude]: [10000 100000 100000000 10000 10000] } // This function demonstrates how to use immediate table operations. diff --git a/server/jetty/src/test/java/io/deephaven/server/jetty/JettyBarrageChunkFactoryTest.java b/server/jetty/src/test/java/io/deephaven/server/jetty/JettyBarrageChunkFactoryTest.java index 8e3a9f2c2df..ade04d7ec4d 100644 --- a/server/jetty/src/test/java/io/deephaven/server/jetty/JettyBarrageChunkFactoryTest.java +++ b/server/jetty/src/test/java/io/deephaven/server/jetty/JettyBarrageChunkFactoryTest.java @@ -7,7 +7,6 @@ import dagger.Module; import dagger.Provides; import dagger.multibindings.IntoSet; -import io.deephaven.UncheckedDeephavenException; import io.deephaven.auth.AuthContext; import io.deephaven.base.clock.Clock; import io.deephaven.client.impl.BearerHandler; @@ -24,7 +23,6 @@ import io.deephaven.engine.util.AbstractScriptSession; import io.deephaven.engine.util.NoLanguageDeephavenSession; import io.deephaven.engine.util.ScriptSession; -import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.barrage.util.BarrageUtil; import io.deephaven.io.logger.LogBuffer; import io.deephaven.io.logger.LogBufferGlobal; @@ -51,6 +49,7 @@ import io.deephaven.server.test.TestAuthModule; import io.deephaven.server.test.TestAuthorizationProvider; import io.deephaven.server.util.Scheduler; +import io.deephaven.util.QueryConstants; import io.deephaven.util.SafeCloseable; import io.grpc.CallOptions; import io.grpc.Channel; @@ -72,9 +71,12 @@ import org.apache.arrow.flight.auth2.Auth2Constants; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.UInt1Vector; +import org.apache.arrow.vector.UInt2Vector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; @@ -98,21 +100,22 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class JettyBarrageChunkFactoryTest { private static final String COLUMN_NAME = "test_col"; + private static final int NUM_ROWS = 1000; + private static final int RANDOM_SEED = 42; @Module public interface JettyTestConfig { @@ -318,7 +321,7 @@ public TestAuthClientInterceptor(String bearerToken) { @Override public ClientCall interceptCall(MethodDescriptor method, - CallOptions callOptions, Channel next) { + CallOptions callOptions, Channel next) { return next.newCall(method, callOptions.withCallCredentials(callCredentials)); } } @@ -374,122 +377,344 @@ protected void after() { } }; - private void fullyReadStream(Ticket ticket, boolean expectError) { - try (final FlightStream stream = flightClient.getStream(ticket)) { - // noinspection StatementWithEmptyBody - while (stream.next()); - if (expectError) { - fail("expected error"); - } - } catch (Exception ignored) { - } + private Schema createSchema(boolean nullable, ArrowType arrowType, Class dhType) { + return createSchema(nullable, arrowType, dhType, null); } - private Schema createSchema(final ArrowType arrowType, final Class dhType) { - return createSchema(arrowType, dhType, null); - } - - private Schema createSchema(final ArrowType arrowType, final Class dhType, final Class dhComponentType) { + private Schema createSchema( + final boolean nullable, + final ArrowType arrowType, + final Class dhType, + final Class dhComponentType) { final Map attrs = new HashMap<>(); attrs.put(BarrageUtil.ATTR_DH_PREFIX + BarrageUtil.ATTR_TYPE_TAG, dhType.getCanonicalName()); if (dhComponentType != null) { attrs.put(BarrageUtil.ATTR_DH_PREFIX + BarrageUtil.ATTR_COMPONENT_TYPE_TAG, dhComponentType.getCanonicalName()); } - final FieldType fieldType = new FieldType(true, arrowType, null, attrs); + final FieldType fieldType = new FieldType(nullable, arrowType, null, attrs); return new Schema(Collections.singletonList( new Field(COLUMN_NAME, fieldType, null))); } @Test public void testInt8() throws Exception { - final int numRows = 16; - final Consumer setupData = source -> { - IntVector vector = (IntVector) source.getFieldVectors().get(0); - for (int ii = 0; ii < numRows; ++ii) { - if (ii % 2 == 0) { - vector.setNull(ii); - } else { - vector.set(ii, (byte) (ii - 8)); + class Test extends RoundTripTest { + Test(Class dhType) { + super(dhType); + } + + @Override + public Schema newSchema(boolean isNullable) { + return createSchema(isNullable, new ArrowType.Int(8, true), dhType); + } + + @Override + public int initializeRoot(@NotNull TinyIntVector source) { + int start = setAll(source::set, + QueryConstants.MIN_BYTE, QueryConstants.MAX_BYTE, (byte) -1, (byte) 0, (byte) 1); + for (int ii = start; ii < NUM_ROWS; ++ii) { + byte value = (byte) rnd.nextInt(); + source.set(ii, value); + if (value == QueryConstants.NULL_BYTE) { + --ii; + } + } + return NUM_ROWS; + } + + @Override + public void validate(@NotNull TinyIntVector source, @NotNull TinyIntVector dest) { + for (int ii = 0; ii < source.getValueCount(); ++ii) { + if (source.isNull(ii)) { + assertTrue(dest.isNull(ii)); + } else if (dhType == char.class && source.get(ii) == -1) { + // this is going to be coerced to null if nullable or else NULL_BYTE if non-nullable + assertTrue(dest.isNull(ii) || dest.get(ii) == QueryConstants.NULL_BYTE); + } else { + assertEquals(source.get(ii), dest.get(ii)); + } + } + } + } + + new Test(byte.class).doTest(); + new Test(char.class).doTest(); + new Test(short.class).doTest(); + new Test(int.class).doTest(); + new Test(long.class).doTest(); + new Test(float.class).doTest(); + new Test(double.class).doTest(); + new Test(BigInteger.class).doTest(); + new Test(BigDecimal.class).doTest(); + } + + @Test + public void testUint8() throws Exception { + class Test extends RoundTripTest { + Test(Class dhType) { + super(dhType); + } + + @Override + public Schema newSchema(boolean isNullable) { + return createSchema(isNullable, new ArrowType.Int(8, false), dhType); + } + + @Override + public int initializeRoot(@NotNull UInt1Vector source) { + int start = setAll(source::set, + QueryConstants.MIN_BYTE, QueryConstants.MAX_BYTE, (byte) -1, (byte) 0, (byte) 1); + for (int ii = start; ii < NUM_ROWS; ++ii) { + byte value = (byte) rnd.nextInt(); + source.set(ii, value); + if (value == QueryConstants.NULL_BYTE) { + --ii; + } + } + return NUM_ROWS; + } + + @Override + public void validate(@NotNull UInt1Vector source, @NotNull UInt1Vector dest) { + for (int ii = 0; ii < source.getValueCount(); ++ii) { + if (source.isNull(ii)) { + assertTrue(dest.isNull(ii)); + } else if (dhType == char.class && source.get(ii) == -1) { + // this is going to be coerced to null if nullable or else NULL_BYTE if non-nullable + assertTrue(dest.isNull(ii) || dest.get(ii) == QueryConstants.NULL_BYTE); + } else { + assertEquals(source.get(ii), dest.get(ii)); + } + } + } + } + + new Test(byte.class).doTest(); + new Test(char.class).doTest(); + new Test(short.class).doTest(); + new Test(int.class).doTest(); + new Test(long.class).doTest(); + new Test(float.class).doTest(); + new Test(double.class).doTest(); + new Test(BigInteger.class).doTest(); + new Test(BigDecimal.class).doTest(); + } + + @Test + public void testInt16() throws Exception { + class Test extends RoundTripTest { + Test(Class dhType) { + super(dhType); + } + + @Override + public Schema newSchema(boolean isNullable) { + return createSchema(isNullable, new ArrowType.Int(16, true), dhType); + } + + @Override + public int initializeRoot(@NotNull SmallIntVector source) { + int start = setAll(source::set, + QueryConstants.MIN_SHORT, QueryConstants.MAX_SHORT, (short) -1, (short) 0, (short) 1); + for (int ii = start; ii < NUM_ROWS; ++ii) { + short value = (short) rnd.nextInt(); + source.set(ii, value); + if (value == QueryConstants.NULL_SHORT) { + --ii; + } } + return NUM_ROWS; } - source.setRowCount(numRows); - }; - final BiConsumer validator = (source, dest) -> { - IntVector sVector = (IntVector) source.getVector(0); - IntVector dVector = (IntVector) dest.getVector(0); - for (int ii = 0; ii < numRows; ii++) { - if (ii % 2 == 0) { - assertTrue(dVector.isNull(ii)); - } else { - assertEquals(sVector.get(ii), dVector.get(ii)); + + @Override + public void validate(@NotNull SmallIntVector source, @NotNull SmallIntVector dest) { + for (int ii = 0; ii < source.getValueCount(); ++ii) { + if (source.isNull(ii)) { + assertTrue(dest.isNull(ii)); + } else if (dhType == byte.class) { + byte asByte = (byte) source.get(ii); + if (asByte == QueryConstants.NULL_BYTE) { + assertTrue(dest.isNull(ii) || dest.get(ii) == QueryConstants.NULL_SHORT); + } else { + assertEquals(asByte, dest.get(ii)); + } + } else if (dhType == char.class && source.get(ii) == -1) { + // this is going to be coerced to null if nullable or else NULL_BYTE if non-nullable + assertTrue(dest.isNull(ii) || dest.get(ii) == QueryConstants.NULL_SHORT); + } else { + assertEquals(source.get(ii), dest.get(ii)); + } } } - }; - final Consumer> runForDhType = dhType -> { - Schema schema = createSchema(Types.MinorType.INT.getType(), dhType); - testRoundTrip(dhType, null, schema, setupData, validator); - }; - - runForDhType.accept(byte.class); -// runForDhType.accept(char.class); - runForDhType.accept(short.class); - runForDhType.accept(int.class); - runForDhType.accept(long.class); - runForDhType.accept(float.class); - runForDhType.accept(double.class); - runForDhType.accept(BigInteger.class); - runForDhType.accept(BigDecimal.class); + } + + new Test(byte.class).doTest(); + new Test(char.class).doTest(); + new Test(short.class).doTest(); + new Test(int.class).doTest(); + new Test(long.class).doTest(); + new Test(float.class).doTest(); + new Test(double.class).doTest(); + new Test(BigInteger.class).doTest(); + new Test(BigDecimal.class).doTest(); } - private void testRoundTrip( - @NotNull final Class dhType, - @Nullable final Class componentType, - @NotNull final Schema schema, - @NotNull final Consumer setupData, - @NotNull final BiConsumer validator) { - try (VectorSchemaRoot source = VectorSchemaRoot.create(schema, allocator)) { - source.allocateNew(); - setupData.accept(source); - - int flightDescriptorTicketValue = nextTicket++; - FlightDescriptor descriptor = FlightDescriptor.path("export", flightDescriptorTicketValue + ""); - FlightClient.ClientStreamListener putStream = flightClient.startPut(descriptor, source, new AsyncPutListener()); - putStream.putNext(); - putStream.completed(); - - // get the table that was uploaded, and confirm it matches what we originally sent - CompletableFuture tableFuture = new CompletableFuture<>(); - SessionState.ExportObject
tableExport = currentSession.getExport(flightDescriptorTicketValue); - currentSession.nonExport() - .onErrorHandler(exception -> tableFuture.cancel(true)) - .require(tableExport) - .submit(() -> tableFuture.complete(tableExport.get())); - - // block until we're done, so we can get the table and see what is inside - putStream.getResult(); - Table uploadedTable = tableFuture.get(); - assertEquals(source.getRowCount(), uploadedTable.size()); - assertEquals(1, uploadedTable.getColumnSourceMap().size()); - ColumnSource columnSource = uploadedTable.getColumnSource(COLUMN_NAME); - assertNotNull(columnSource); - assertEquals(columnSource.getType(), dhType); - assertEquals(columnSource.getComponentType(), componentType); - - try (FlightStream stream = flightClient.getStream(flightTicketFor(flightDescriptorTicketValue))) { - VectorSchemaRoot dest = stream.getRoot(); - - int numPayloads = 0; - while (stream.next()) { - assertEquals(source.getRowCount(), dest.getRowCount()); - validator.accept(source, dest); - ++numPayloads; + @Test + public void testUint16() throws Exception { + class Test extends RoundTripTest { + Test(Class dhType) { + super(dhType); + } + + @Override + public Schema newSchema(boolean isNullable) { + return createSchema(isNullable, new ArrowType.Int(16, false), dhType); + } + + @Override + public int initializeRoot(@NotNull UInt2Vector source) { + int start = setAll(source::set, + (char) 6784, + QueryConstants.MIN_CHAR, QueryConstants.MAX_CHAR, (char) 1); + for (int ii = start; ii < NUM_ROWS; ++ii) { + char value = (char) rnd.nextInt(); + source.set(ii, value); + if (value == QueryConstants.NULL_CHAR) { + --ii; + } + } + return NUM_ROWS; + } + + @Override + public void validate(@NotNull UInt2Vector source, @NotNull UInt2Vector dest) { + for (int ii = 0; ii < source.getValueCount(); ++ii) { + if (source.isNull(ii)) { + assertTrue(dest.isNull(ii)); + } else if (dhType == byte.class) { + byte asByte = (byte) source.get(ii); + if (asByte == QueryConstants.NULL_BYTE || asByte == -1) { + assertTrue(dest.isNull(ii) || dest.get(ii) == QueryConstants.NULL_CHAR); + } else { + assertEquals((char) asByte, dest.get(ii)); + } + } else { + assertEquals(source.get(ii), dest.get(ii)); + } } + } + } + + new Test(byte.class).doTest(); + new Test(char.class).doTest(); + new Test(short.class).doTest(); + new Test(int.class).doTest(); + new Test(long.class).doTest(); + new Test(float.class).doTest(); + new Test(double.class).doTest(); + new Test(BigInteger.class).doTest(); + new Test(BigDecimal.class).doTest(); + } - assertEquals(1, numPayloads); + // For list tests: test both head and tail via FixedSizeList limits + + private static int setAll(BiConsumer setter, T... values) { + for (int ii = 0; ii < values.length; ++ii) { + setter.accept(ii, values[ii]); + } + return values.length; + } + + protected enum NullMode { ALL, NONE, SOME, NOT_NULLABLE } + private abstract class RoundTripTest { + protected final Random rnd = new Random(RANDOM_SEED); + protected Class dhType; + protected Class componentType; + + public RoundTripTest(@NotNull final Class dhType) { + this(dhType, null); + } + + public RoundTripTest(@NotNull final Class dhType, @Nullable final Class componentType) { + this.dhType = dhType; + this.componentType = componentType; + } + + public abstract Schema newSchema(boolean isNullable); + public abstract int initializeRoot(@NotNull final T source); + public abstract void validate(@NotNull final T source, @NotNull final T dest); + + public void doTest() throws Exception { + doTest(NullMode.NOT_NULLABLE); + doTest(NullMode.NONE); + doTest(NullMode.SOME); + doTest(NullMode.ALL); + } + + public void doTest(final NullMode nullMode) throws Exception { + final Schema schema = newSchema(nullMode != NullMode.NOT_NULLABLE); + try (VectorSchemaRoot source = VectorSchemaRoot.create(schema, allocator)) { + source.allocateNew(); + // noinspection unchecked + int numRows = initializeRoot((T) source.getFieldVectors().get(0)); + source.setRowCount(numRows); + + if (nullMode == NullMode.ALL) { + for (FieldVector vector : source.getFieldVectors()) { + for (int ii = 0; ii < source.getRowCount(); ++ii) { + vector.setNull(ii); + } + } + } else if (nullMode == NullMode.SOME) { + for (FieldVector vector : source.getFieldVectors()) { + for (int ii = 0; ii < source.getRowCount(); ++ii) { + if (rnd.nextBoolean()) { + vector.setNull(ii); + } + } + } + } + + int flightDescriptorTicketValue = nextTicket++; + FlightDescriptor descriptor = FlightDescriptor.path("export", flightDescriptorTicketValue + ""); + FlightClient.ClientStreamListener putStream = + flightClient.startPut(descriptor, source, new AsyncPutListener()); + putStream.putNext(); + putStream.completed(); + + // get the table that was uploaded, and confirm it matches what we originally sent + CompletableFuture
tableFuture = new CompletableFuture<>(); + SessionState.ExportObject
tableExport = currentSession.getExport(flightDescriptorTicketValue); + currentSession.nonExport() + .onErrorHandler(exception -> tableFuture.cancel(true)) + .require(tableExport) + .submit(() -> tableFuture.complete(tableExport.get())); + + // block until we're done, so we can get the table and see what is inside + putStream.getResult(); + Table uploadedTable = tableFuture.get(); + assertEquals(source.getRowCount(), uploadedTable.size()); + assertEquals(1, uploadedTable.getColumnSourceMap().size()); + ColumnSource columnSource = uploadedTable.getColumnSource(COLUMN_NAME); + assertNotNull(columnSource); + assertEquals(columnSource.getType(), dhType); + assertEquals(columnSource.getComponentType(), componentType); + + try (FlightStream stream = flightClient.getStream(flightTicketFor(flightDescriptorTicketValue))) { + VectorSchemaRoot dest = stream.getRoot(); + + int numPayloads = 0; + while (stream.next()) { + assertEquals(source.getRowCount(), dest.getRowCount()); + // noinspection unchecked + validate((T) source.getFieldVectors().get(0), (T) dest.getFieldVectors().get(0)); + ++numPayloads; + } + + assertEquals(1, numPayloads); + } } - } catch (Exception err) { - throw new UncheckedDeephavenException("round trip test failure", err); } } diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_012dc568a40e08aeb849b71227532f8ebe42accea1f4bbe4a7e3b8c7f433ff9cv64_0/Formula$FormulaFillContext.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_012dc568a40e08aeb849b71227532f8ebe42accea1f4bbe4a7e3b8c7f433ff9cv64_0/Formula$FormulaFillContext.class deleted file mode 100644 index 35b25183f5b..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_012dc568a40e08aeb849b71227532f8ebe42accea1f4bbe4a7e3b8c7f433ff9cv64_0/Formula$FormulaFillContext.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_012dc568a40e08aeb849b71227532f8ebe42accea1f4bbe4a7e3b8c7f433ff9cv64_0/Formula.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_012dc568a40e08aeb849b71227532f8ebe42accea1f4bbe4a7e3b8c7f433ff9cv64_0/Formula.class deleted file mode 100644 index 4b94afb1edc..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_012dc568a40e08aeb849b71227532f8ebe42accea1f4bbe4a7e3b8c7f433ff9cv64_0/Formula.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_4e4c6857a5b4178aa7be3875b46d075b3a7c11b827374e96f98cea9d064428fcv64_0/Formula$FormulaFillContext.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_4e4c6857a5b4178aa7be3875b46d075b3a7c11b827374e96f98cea9d064428fcv64_0/Formula$FormulaFillContext.class deleted file mode 100644 index 65721fa6542..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_4e4c6857a5b4178aa7be3875b46d075b3a7c11b827374e96f98cea9d064428fcv64_0/Formula$FormulaFillContext.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_4e4c6857a5b4178aa7be3875b46d075b3a7c11b827374e96f98cea9d064428fcv64_0/Formula.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_4e4c6857a5b4178aa7be3875b46d075b3a7c11b827374e96f98cea9d064428fcv64_0/Formula.class deleted file mode 100644 index 8fa7786004e..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_4e4c6857a5b4178aa7be3875b46d075b3a7c11b827374e96f98cea9d064428fcv64_0/Formula.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_673008ef261faf1b24552c7eebcc2ab0541a8efe127fe785886df2cb8b73b4b0v64_0/Formula$FormulaFillContext.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_673008ef261faf1b24552c7eebcc2ab0541a8efe127fe785886df2cb8b73b4b0v64_0/Formula$FormulaFillContext.class deleted file mode 100644 index b700916b7aa..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_673008ef261faf1b24552c7eebcc2ab0541a8efe127fe785886df2cb8b73b4b0v64_0/Formula$FormulaFillContext.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_673008ef261faf1b24552c7eebcc2ab0541a8efe127fe785886df2cb8b73b4b0v64_0/Formula.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_673008ef261faf1b24552c7eebcc2ab0541a8efe127fe785886df2cb8b73b4b0v64_0/Formula.class deleted file mode 100644 index 32e7c9ab7cb..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_673008ef261faf1b24552c7eebcc2ab0541a8efe127fe785886df2cb8b73b4b0v64_0/Formula.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c88a20a36eff96f6e498144f56f8a303d3f649602ac336ea7143a3004a74850bv64_0/Formula$FormulaFillContext.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c88a20a36eff96f6e498144f56f8a303d3f649602ac336ea7143a3004a74850bv64_0/Formula$FormulaFillContext.class deleted file mode 100644 index e16bbb7397d..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c88a20a36eff96f6e498144f56f8a303d3f649602ac336ea7143a3004a74850bv64_0/Formula$FormulaFillContext.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c88a20a36eff96f6e498144f56f8a303d3f649602ac336ea7143a3004a74850bv64_0/Formula.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c88a20a36eff96f6e498144f56f8a303d3f649602ac336ea7143a3004a74850bv64_0/Formula.class deleted file mode 100644 index 9456786d522..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c88a20a36eff96f6e498144f56f8a303d3f649602ac336ea7143a3004a74850bv64_0/Formula.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c8f9bf468d6a56caeae53546d948fab7d54706d9c674dc5e943d94d3aa7390ffv64_0/Formula$FormulaFillContext.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c8f9bf468d6a56caeae53546d948fab7d54706d9c674dc5e943d94d3aa7390ffv64_0/Formula$FormulaFillContext.class deleted file mode 100644 index 2c896130a73..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c8f9bf468d6a56caeae53546d948fab7d54706d9c674dc5e943d94d3aa7390ffv64_0/Formula$FormulaFillContext.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c8f9bf468d6a56caeae53546d948fab7d54706d9c674dc5e943d94d3aa7390ffv64_0/Formula.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c8f9bf468d6a56caeae53546d948fab7d54706d9c674dc5e943d94d3aa7390ffv64_0/Formula.class deleted file mode 100644 index e4eddfba2ec..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_c8f9bf468d6a56caeae53546d948fab7d54706d9c674dc5e943d94d3aa7390ffv64_0/Formula.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_d85e090333ad41d34f0b7335ffcf5c5a6fbf910bfbaab31f07bdeea0d64893aev64_0/Formula$FormulaFillContext.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_d85e090333ad41d34f0b7335ffcf5c5a6fbf910bfbaab31f07bdeea0d64893aev64_0/Formula$FormulaFillContext.class deleted file mode 100644 index 8a9745e84a4..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_d85e090333ad41d34f0b7335ffcf5c5a6fbf910bfbaab31f07bdeea0d64893aev64_0/Formula$FormulaFillContext.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_d85e090333ad41d34f0b7335ffcf5c5a6fbf910bfbaab31f07bdeea0d64893aev64_0/Formula.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_d85e090333ad41d34f0b7335ffcf5c5a6fbf910bfbaab31f07bdeea0d64893aev64_0/Formula.class deleted file mode 100644 index 0f950f2e901..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_d85e090333ad41d34f0b7335ffcf5c5a6fbf910bfbaab31f07bdeea0d64893aev64_0/Formula.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_f0a36e4bf7dd41d038f9bd24522644fb6cedf543b97d594ef5ad5726bfe91cfev64_0/Formula$FormulaFillContext.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_f0a36e4bf7dd41d038f9bd24522644fb6cedf543b97d594ef5ad5726bfe91cfev64_0/Formula$FormulaFillContext.class deleted file mode 100644 index 358924deb70..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_f0a36e4bf7dd41d038f9bd24522644fb6cedf543b97d594ef5ad5726bfe91cfev64_0/Formula$FormulaFillContext.class and /dev/null differ diff --git a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_f0a36e4bf7dd41d038f9bd24522644fb6cedf543b97d594ef5ad5726bfe91cfev64_0/Formula.class b/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_f0a36e4bf7dd41d038f9bd24522644fb6cedf543b97d594ef5ad5726bfe91cfev64_0/Formula.class deleted file mode 100644 index 04c552f75e0..00000000000 Binary files a/server/netty/io.deephaven.engine.context.QueryCompiler.createForUnitTests/io/deephaven/temp/c_f0a36e4bf7dd41d038f9bd24522644fb6cedf543b97d594ef5ad5726bfe91cfev64_0/Formula.class and /dev/null differ diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java.orig b/server/src/main/java/io/deephaven/server/session/SessionState.java.orig deleted file mode 100644 index e45a86b33c6..00000000000 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java.orig +++ /dev/null @@ -1,1558 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.server.session; - -import com.github.f4b6a3.uuid.UuidCreator; -import com.google.rpc.Code; -import dagger.assisted.Assisted; -import dagger.assisted.AssistedFactory; -import dagger.assisted.AssistedInject; -import io.deephaven.base.reference.WeakSimpleReference; -import io.deephaven.base.verify.Assert; -import io.deephaven.engine.liveness.LivenessArtifact; -import io.deephaven.engine.liveness.LivenessReferent; -import io.deephaven.engine.liveness.LivenessScopeStack; -import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; -import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; -import io.deephaven.engine.table.impl.perf.QueryState; -import io.deephaven.engine.table.impl.util.EngineMetrics; -import io.deephaven.engine.updategraph.DynamicNode; -import io.deephaven.hash.KeyedIntObjectHash; -import io.deephaven.hash.KeyedIntObjectHashMap; -import io.deephaven.hash.KeyedIntObjectKey; -import io.deephaven.internal.log.LoggerFactory; -import io.deephaven.io.log.LogEntry; -import io.deephaven.io.logger.Logger; -import io.deephaven.proto.backplane.grpc.ExportNotification; -import io.deephaven.proto.backplane.grpc.Ticket; -import io.deephaven.proto.flight.util.FlightExportTicketHelper; -import io.deephaven.proto.util.Exceptions; -import io.deephaven.proto.util.ExportTicketHelper; -import io.deephaven.server.util.Scheduler; -import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.util.SafeCloseable; -import io.deephaven.util.annotations.VisibleForTesting; -import io.deephaven.auth.AuthContext; -import io.deephaven.util.datastructures.SimpleReferenceManager; -import io.deephaven.util.process.ProcessEnvironment; -import io.grpc.StatusRuntimeException; -import io.grpc.stub.StreamObserver; -import org.apache.arrow.flight.impl.Flight; -import org.apache.commons.lang3.mutable.MutableObject; -import org.jetbrains.annotations.NotNull; - -import javax.annotation.Nullable; -import javax.inject.Provider; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.Consumer; - -import static io.deephaven.base.log.LogOutput.MILLIS_FROM_EPOCH_FORMATTER; -import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; -import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyError; - -/** - * SessionState manages all exports for a single session. - * - *

- * It manages exported {@link LivenessReferent}. It cascades failures to child dependencies. - * - *

- * TODO: - cyclical dependency detection - out-of-order dependency timeout - * - *

- * Details Regarding Data Structure of ExportObjects: - * - *

    - *
  • The exportMap map, exportListeners list, exportListenerVersion, and export object's exportListenerVersion work - * together to enable a listener to synchronize with outstanding exports in addition to sending the listener updates - * while they continue to subscribe.
  • - * - *
  • SessionState::exportMap's purpose is to map from the export id to the export object
  • - *
  • SessionState::exportListeners' purpose is to keep a list of active subscribers
  • - *
  • SessionState::exportListenerVersion's purpose is to know whether or not a subscriber has already seen a - * status
  • - * - *
  • A listener will receive an export notification for export id NON_EXPORT_ID (a zero) to indicate that the run has - * completed. A listener may see an update for an export before receiving the "run has completed" message. A listener - * should be prepared to receive duplicate/redundant updates.
  • - *
- */ -public class SessionState { - // Some work items will be dependent on other exports, but do not export anything themselves. - public static final int NON_EXPORT_ID = 0; - - @AssistedFactory - public interface Factory { - SessionState create(AuthContext authContext); - } - - /** - * Wrap an object in an ExportObject to make it conform to the session export API. - * - * @param export the object to wrap - * @param the type of the object - * @return a sessionless export object - */ - public static ExportObject wrapAsExport(final T export) { - return new ExportObject<>(export); - } - - /** - * Wrap an exception in an ExportObject to make it conform to the session export API. The export behaves as if it - * has already failed. - * - * @param caughtException the exception to propagate - * @param the type of the object - * @return a sessionless export object - */ - public static ExportObject wrapAsFailedExport(final Exception caughtException) { - ExportObject exportObject = new ExportObject<>(null); - exportObject.caughtException = caughtException; - return exportObject; - } - - private static final Logger log = LoggerFactory.getLogger(SessionState.class); - - private final String logPrefix; - private final Scheduler scheduler; - private final SessionService.ErrorTransformer errorTransformer; - private final AuthContext authContext; - - private final String sessionId; - private volatile SessionService.TokenExpiration expiration = null; - private static final AtomicReferenceFieldUpdater EXPIRATION_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(SessionState.class, SessionService.TokenExpiration.class, - "expiration"); - - // some types of exports have a more sound story if the server tells the client what to call it - private volatile int nextServerAllocatedId = -1; - private static final AtomicIntegerFieldUpdater SERVER_EXPORT_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(SessionState.class, "nextServerAllocatedId"); - - // maintains all requested exports by this client's session - private final KeyedIntObjectHashMap> exportMap = new KeyedIntObjectHashMap<>(EXPORT_OBJECT_ID_KEY); - - // the list of active listeners - private final List exportListeners = new CopyOnWriteArrayList<>(); - private volatile int exportListenerVersion = 0; - - // Usually, export life cycles are managed explicitly with the life cycle of the session state. However, we need - // to be able to close non-exports that are not in the map but are otherwise satisfying outstanding gRPC requests. - private final SimpleReferenceManager> onCloseCallbacks = - new SimpleReferenceManager<>(WeakSimpleReference::new, false); - - private final ExecutionContext executionContext; - - @AssistedInject - public SessionState( - final Scheduler scheduler, - final SessionService.ErrorTransformer errorTransformer, - final Provider executionContextProvider, - @Assisted final AuthContext authContext) { - this.sessionId = UuidCreator.toString(UuidCreator.getRandomBased()); - this.logPrefix = "SessionState{" + sessionId + "}: "; - this.scheduler = scheduler; - this.errorTransformer = errorTransformer; - this.authContext = authContext; - this.executionContext = executionContextProvider.get().withAuthContext(authContext); - log.debug().append(logPrefix).append("session initialized").endl(); - } - - /** - * This method is controlled by SessionService to update the expiration whenever the session is refreshed. - * - * @param expiration the initial expiration time and session token - */ - @VisibleForTesting - protected void initializeExpiration(@NotNull final SessionService.TokenExpiration expiration) { - if (expiration.session != this) { - throw new IllegalArgumentException("mismatched session for expiration token"); - } - - if (!EXPIRATION_UPDATER.compareAndSet(this, null, expiration)) { - throw new IllegalStateException("session already initialized"); - } - - log.debug().append(logPrefix) - .append("token initialized to '").append(expiration.token.toString()) - .append("' which expires at ").append(MILLIS_FROM_EPOCH_FORMATTER, expiration.deadlineMillis) - .append(".").endl(); - } - - /** - * This method is controlled by SessionService to update the expiration whenever the session is refreshed. - * - * @param expiration the new expiration time and session token - */ - @VisibleForTesting - protected void updateExpiration(@NotNull final SessionService.TokenExpiration expiration) { - if (expiration.session != this) { - throw new IllegalArgumentException("mismatched session for expiration token"); - } - - SessionService.TokenExpiration prevToken = this.expiration; - while (prevToken != null) { - if (EXPIRATION_UPDATER.compareAndSet(this, prevToken, expiration)) { - break; - } - prevToken = this.expiration; - } - - if (prevToken == null) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - - log.debug().append(logPrefix).append("token, expires at ") - .append(MILLIS_FROM_EPOCH_FORMATTER, expiration.deadlineMillis).append(".").endl(); - } - - /** - * @return the session id - */ - public String getSessionId() { - return sessionId; - } - - /** - * @return the current expiration token for this session - */ - public SessionService.TokenExpiration getExpiration() { - if (isExpired()) { - return null; - } - return expiration; - } - - /** - * @return whether or not this session is expired - */ - public boolean isExpired() { - final SessionService.TokenExpiration currToken = expiration; - return currToken == null || currToken.deadlineMillis <= scheduler.currentTimeMillis(); - } - - /** - * @return the auth context for this session - */ - public AuthContext getAuthContext() { - return authContext; - } - - /** - * @return the execution context for this session - */ - public ExecutionContext getExecutionContext() { - return executionContext; - } - - /** - * Grab the ExportObject for the provided ticket. - * - * @param ticket the export ticket - * @param logId an end-user friendly identification of the ticket should an error occur - * @return a future-like object that represents this export - */ - public ExportObject getExport(final Ticket ticket, final String logId) { - return getExport(ExportTicketHelper.ticketToExportId(ticket, logId)); - } - - /** - * Grab the ExportObject for the provided ticket. - * - * @param ticket the export ticket - * @param logId an end-user friendly identification of the ticket should an error occur - * @return a future-like object that represents this export - */ - public ExportObject getExport(final Flight.Ticket ticket, final String logId) { - return getExport(FlightExportTicketHelper.ticketToExportId(ticket, logId)); - } - - /** - * Grab the ExportObject for the provided id. - * - * @param exportId the export handle id - * @return a future-like object that represents this export - */ - @SuppressWarnings("unchecked") - public ExportObject getExport(final int exportId) { - if (isExpired()) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - - final ExportObject result; - - if (exportId < NON_EXPORT_ID) { - // If this a server-side export then it must already exist or else is a user error. - result = (ExportObject) exportMap.get(exportId); - - if (result == null) { - throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, - "Export id " + exportId + " does not exist and cannot be used out-of-order!"); - } - } else if (exportId > NON_EXPORT_ID) { - // If this a client-side export we'll allow an out-of-order request by creating a new export object. - result = (ExportObject) exportMap.putIfAbsent(exportId, EXPORT_OBJECT_VALUE_FACTORY); - } else { - // If this is a non-export request, then it is a user error. - throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, - "Export id " + exportId + " refers to a non-export and cannot be requested!"); - } - - return result; - } - - /** - * Grab the ExportObject for the provided id if it already exists, otherwise return null. - * - * @param exportId the export handle id - * @return a future-like object that represents this export - */ - @SuppressWarnings("unchecked") - public ExportObject getExportIfExists(final int exportId) { - if (isExpired()) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - - return (ExportObject) exportMap.get(exportId); - } - - /** - * Grab the ExportObject for the provided id if it already exists, otherwise return null. - * - * @param ticket the export ticket - * @param logId an end-user friendly identification of the ticket should an error occur - * @return a future-like object that represents this export - */ - public ExportObject getExportIfExists(final Ticket ticket, final String logId) { - return getExportIfExists(ExportTicketHelper.ticketToExportId(ticket, logId)); - } - - /** - * Create and export a pre-computed element. This is typically used in scenarios where the number of exports is not - * known in advance by the requesting client. - * - * @param export the result of the export - * @param the export type - * @return the ExportObject for this item for ease of access to the export - */ - public ExportObject newServerSideExport(final T export) { - if (isExpired()) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - - final int exportId = SERVER_EXPORT_UPDATER.getAndDecrement(this); - - // noinspection unchecked - final ExportObject result = (ExportObject) exportMap.putIfAbsent(exportId, EXPORT_OBJECT_VALUE_FACTORY); - result.setResult(export); - return result; - } - - /** - * Create an ExportBuilder to create the export after dependencies are satisfied. - * - * @param ticket the grpc {@link Flight.Ticket} for this export - * @param logId an end-user friendly identification of the ticket should an error occur - * @param the export type that the callable will return - * @return an export builder - */ - public ExportBuilder newExport(final Flight.Ticket ticket, final String logId) { - return newExport(FlightExportTicketHelper.ticketToExportId(ticket, logId)); - } - - /** - * Create an ExportBuilder to create the export after dependencies are satisfied. - * - * @param ticket the grpc {@link Ticket} for this export - * @param logId an end-user friendly identification of the ticket should an error occur - * @param the export type that the callable will return - * @return an export builder - */ - public ExportBuilder newExport(final Ticket ticket, final String logId) { - return newExport(ExportTicketHelper.ticketToExportId(ticket, logId)); - } - - /** - * Create an ExportBuilder to create the export after dependencies are satisfied. - * - * @param exportId the export id - * @param the export type that the callable will return - * @return an export builder - */ - @VisibleForTesting - public ExportBuilder newExport(final int exportId) { - if (isExpired()) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - if (exportId <= 0) { - throw new IllegalArgumentException("exportId's <= 0 are reserved for server allocation only"); - } - return new ExportBuilder<>(exportId); - } - - /** - * Create an ExportBuilder to perform work after dependencies are satisfied that itself does not create any exports. - * - * @return an export builder - */ - public ExportBuilder nonExport() { - if (isExpired()) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - return new ExportBuilder<>(NON_EXPORT_ID); - } - - /** - * Attach an on-close callback bound to the life of the session. Note that {@link Closeable} does not require that - * the close() method be idempotent, but when combined with {@link #removeOnCloseCallback(Closeable)}, close() will - * only be called once from this class. - *

- *

- * If called after the session has expired, this will throw, and the close() method on the provided instance will - * not be called. - * - * @param onClose the callback to invoke at end-of-life - */ - public void addOnCloseCallback(final Closeable onClose) { - synchronized (onCloseCallbacks) { - if (isExpired()) { - // After the session has expired, nothing new can be added to the collection, so throw an exception (and - // release the lock, allowing each item already in the collection to be released) - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - onCloseCallbacks.add(onClose); - } - } - - /** - * Remove an on-close callback bound to the life of the session. - *

- * A common pattern to use this will be for an object to try to remove itself, and if it succeeds, to call its own - * {@link Closeable#close()}. If it fails, it can expect to have close() be called automatically. - * - * @param onClose the callback to no longer invoke at end-of-life - * @return true iff the callback was removed - * @apiNote If this SessionState has already begun expiration processing, {@code onClose} will not be removed by - * this method. This means that if {@code onClose} was previously added and not removed, it either has - * already been invoked or will be invoked by the SessionState. - */ - public boolean removeOnCloseCallback(final Closeable onClose) { - if (isExpired()) { - // After the session has expired, nothing can be removed from the collection. - return false; - } - synchronized (onCloseCallbacks) { - return onCloseCallbacks.remove(onClose) != null; - } - } - - /** - * Notes that this session has expired and exports should be released. - */ - public void onExpired() { - // note that once we set expiration to null; we are not able to add any more objects to the exportMap - SessionService.TokenExpiration prevToken = expiration; - while (prevToken != null) { - if (EXPIRATION_UPDATER.compareAndSet(this, prevToken, null)) { - break; - } - prevToken = expiration; - } - if (prevToken == null) { - // already expired - return; - } - - log.debug().append(logPrefix).append("releasing outstanding exports").endl(); - synchronized (exportMap) { - exportMap.forEach(ExportObject::cancel); - exportMap.clear(); - } - - log.debug().append(logPrefix).append("outstanding exports released").endl(); - synchronized (exportListeners) { - exportListeners.forEach(ExportListener::onRemove); - exportListeners.clear(); - } - - final List callbacksToClose; - synchronized (onCloseCallbacks) { - callbacksToClose = new ArrayList<>(onCloseCallbacks.size()); - onCloseCallbacks.forEach((ref, callback) -> callbacksToClose.add(callback)); - onCloseCallbacks.clear(); - } - callbacksToClose.forEach(callback -> { - try { - callback.close(); - } catch (final IOException e) { - log.error().append(logPrefix).append("error during onClose callback: ").append(e).endl(); - } - }); - } - - /** - * @return true iff the provided export state is a failure state - */ - public static boolean isExportStateFailure(final ExportNotification.State state) { - return state == ExportNotification.State.FAILED || state == ExportNotification.State.CANCELLED - || state == ExportNotification.State.DEPENDENCY_FAILED - || state == ExportNotification.State.DEPENDENCY_NEVER_FOUND - || state == ExportNotification.State.DEPENDENCY_RELEASED - || state == ExportNotification.State.DEPENDENCY_CANCELLED; - } - - /** - * @return true iff the provided export state is a terminal state - */ - public static boolean isExportStateTerminal(final ExportNotification.State state) { - return state == ExportNotification.State.RELEASED || isExportStateFailure(state); - } - - /** - * This class represents one unit of content exported in the session. - * - *

- * Note: we reuse ExportObject for non-exporting tasks that have export dependencies. - * - * @param Is context-sensitive depending on the export. - * - * @apiNote ExportId may be 0, if this is a task that has exported dependencies, but does not export anything - * itself. Non-exports do not publish state changes. - */ - public final static class ExportObject extends LivenessArtifact { - private final int exportId; - private final String logIdentity; - private final SessionService.ErrorTransformer errorTransformer; - private final SessionState session; - - /** used to keep track of performance details either for aggregation or for the async ticket resolution */ - private QueryPerformanceRecorder queryPerformanceRecorder; - - /** final result of export */ - private volatile T result; - private volatile ExportNotification.State state = ExportNotification.State.UNKNOWN; - private volatile int exportListenerVersion = 0; - - /** Indicates whether this export has already been well defined. This prevents export object reuse. */ - private boolean hasHadWorkSet = false; - - /** This indicates whether or not this export should use the serial execution queue. */ - private boolean requiresSerialQueue; - - /** This is a reference of the work to-be-done. It is non-null only during the PENDING state. */ - private Callable exportMain; - /** This is a reference to the error handler to call if this item enters one of the failure states. */ - @Nullable - private ExportErrorHandler errorHandler; - /** This is a reference to the success handler to call if this item successfully exports. */ - @Nullable - private Consumer successHandler; - - /** used to keep track of which children need notification on export completion */ - private List> children = Collections.emptyList(); - /** used to manage liveness of dependencies (to prevent a dependency from being released before it is used) */ - private List> parents = Collections.emptyList(); - - /** used to detect when this object is ready for export (is visible for atomic int field updater) */ - private volatile int dependentCount = -1; - /** our first parent that was already released prior to having dependencies set if one exists */ - private ExportObject alreadyDeadParent; - - @SuppressWarnings("unchecked") - private static final AtomicIntegerFieldUpdater> DEPENDENT_COUNT_UPDATER = - AtomicIntegerFieldUpdater.newUpdater((Class>) (Class) ExportObject.class, - "dependentCount"); - - /** used to identify and propagate error details */ - private String errorId; - private String failedDependencyLogIdentity; - private Exception caughtException; - - /** - * @param errorTransformer the error transformer to use - * @param exportId the export id for this export - */ - private ExportObject( - final SessionService.ErrorTransformer errorTransformer, - final SessionState session, - final int exportId) { - super(true); - this.errorTransformer = errorTransformer; - this.session = session; - this.exportId = exportId; - this.logIdentity = - isNonExport() ? Integer.toHexString(System.identityHashCode(this)) : Long.toString(exportId); - setState(ExportNotification.State.UNKNOWN); - - // we retain a reference until a non-export becomes EXPORTED or a regular export becomes RELEASED - retainReference(); - } - - /** - * Create an ExportObject that is not tied to any session. These must be non-exports that have require no work - * to be performed. These export objects can be used as dependencies. - * - * @param result the object to wrap in an export - */ - private ExportObject(final T result) { - super(true); - this.errorTransformer = null; - this.session = null; - this.exportId = NON_EXPORT_ID; - this.result = result; - this.dependentCount = 0; - this.hasHadWorkSet = true; - this.logIdentity = Integer.toHexString(System.identityHashCode(this)) + "-sessionless"; - - if (result == null) { - maybeAssignErrorId(); - state = ExportNotification.State.FAILED; - } else { - state = ExportNotification.State.EXPORTED; - } - - if (result instanceof LivenessReferent && DynamicNode.notDynamicOrIsRefreshing(result)) { - manage((LivenessReferent) result); - } - } - - private boolean isNonExport() { - return exportId == NON_EXPORT_ID; - } - - private synchronized void setQueryPerformanceRecorder( - final QueryPerformanceRecorder queryPerformanceRecorder) { - if (this.queryPerformanceRecorder != null) { - throw new IllegalStateException( - "performance query recorder can only be set once on an exportable object"); - } - this.queryPerformanceRecorder = queryPerformanceRecorder; - } - - /** - * Sets the dependencies and tracks liveness dependencies. - * - * @param parents the dependencies that must be exported prior to invoking the exportMain callable - */ - private synchronized void setDependencies(final List> parents) { - if (dependentCount != -1) { - throw new IllegalStateException("dependencies can only be set once on an exportable object"); - } - - this.parents = parents; - dependentCount = parents.size(); - for (final ExportObject parent : parents) { - if (parent != null && !tryManage(parent)) { - // we've failed; let's cleanup already managed parents - forceReferenceCountToZero(); - alreadyDeadParent = parent; - break; - } - } - - if (log.isDebugEnabled()) { - final Exception e = new RuntimeException(); - final LogEntry entry = - log.debug().append(e).nl().append(session.logPrefix).append("export '").append(logIdentity) - .append("' has ").append(dependentCount).append(" dependencies remaining: "); - for (ExportObject parent : parents) { - entry.nl().append('\t').append(parent.logIdentity).append(" is ").append(parent.getState().name()); - } - entry.endl(); - } - } - - /** - * Sets the dependencies and initializes the relevant data structures to include this export as a child for - * each. - * - * @param exportMain the exportMain callable to invoke when dependencies are satisfied - * @param errorHandler the errorHandler to notify so that it may propagate errors to the requesting client - */ - private synchronized void setWork( - @NotNull final Callable exportMain, - @Nullable final ExportErrorHandler errorHandler, - @Nullable final Consumer successHandler, - final boolean requiresSerialQueue) { - if (hasHadWorkSet) { - throw new IllegalStateException("export object can only be defined once"); - } - hasHadWorkSet = true; - if (queryPerformanceRecorder != null && queryPerformanceRecorder.getState() == QueryState.RUNNING) { - // transfer ownership of the qpr to the export before it can be resumed by the scheduler - queryPerformanceRecorder.suspendQuery(); - } - this.requiresSerialQueue = requiresSerialQueue; - - // we defer this type of failure until setWork for consistency in error handling - if (alreadyDeadParent != null) { - onDependencyFailure(alreadyDeadParent); - alreadyDeadParent = null; - } - - if (isExportStateTerminal(state)) { - // The following scenarios cause us to get into this state: - // - this export object was released/cancelled - // - the session expiration propagated to this export object - // - a parent export was released/dead prior to `setDependencies` - // Note that failed dependencies will be handled in the onResolveOne method below. - - // since this is the first we know of the errorHandler, it could not have been invoked yet - if (errorHandler != null) { - maybeAssignErrorId(); - errorHandler.onError(state, errorId, caughtException, failedDependencyLogIdentity); - } - return; - } - - this.exportMain = exportMain; - this.errorHandler = errorHandler; - this.successHandler = successHandler; - - if (state != ExportNotification.State.PUBLISHING) { - setState(ExportNotification.State.PENDING); - } else if (dependentCount > 0) { - throw new IllegalStateException("published exports cannot have dependencies"); - } - if (dependentCount <= 0) { - dependentCount = 0; - scheduleExport(); - } else { - for (final ExportObject parent : parents) { - // we allow parents to be null to simplify calling conventions around optional dependencies - if (parent == null || !parent.maybeAddDependency(this)) { - onResolveOne(parent); - } - // else parent will notify us on completion - } - } - } - - /** - * WARNING! This method call is only safe to use in the following patterns: - *

- * 1) If an export (or non-export) {@link ExportBuilder#require}'d this export then the method is valid from - * within the Callable/Runnable passed to {@link ExportBuilder#submit}. - *

- * 2) By first obtaining a reference to the {@link ExportObject}, and then observing its state as - * {@link ExportNotification.State#EXPORTED}. The caller must abide by the Liveness API and dropReference. - *

- * Example: - * - *

-         * {@code
-         *  T getFromExport(ExportObject export) {
-         *     if (export.tryRetainReference()) {
-         *         try {
-         *             if (export.getState() == ExportNotification.State.EXPORTED) {
-         *                 return export.get();
-         *             }
-         *         } finally {
-         *             export.dropReference();
-         *         }
-         *     }
-         *     return null;
-         * }
-         * }
-         * 
- * - * @return the result of the computed export - */ - public T get() { - if (session != null && session.isExpired()) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - final T localResult = result; - // Note: an export may be released while still being a dependency of queued work; so let's make sure we're - // still valid - if (localResult == null) { - throw new IllegalStateException( - "Dependent export '" + exportId + "' is null and in state " + state.name()); - } - return localResult; - } - - /** - * @return the current state of this export - */ - public ExportNotification.State getState() { - return state; - } - - /** - * @return the ticket for this export; note if this is a non-export the returned ticket will not resolve to - * anything and is considered an invalid ticket - */ - public Ticket getExportId() { - return ExportTicketHelper.wrapExportIdInTicket(exportId); - } - - /** - * Add dependency if object export has not yet completed. - * - * @param child the dependent task - * @return true if the child was added as a dependency - */ - private boolean maybeAddDependency(final ExportObject child) { - if (state == ExportNotification.State.EXPORTED || isExportStateTerminal(state)) { - return false; - } - synchronized (this) { - if (state == ExportNotification.State.EXPORTED || isExportStateTerminal(state)) { - return false; - } - - if (children.isEmpty()) { - children = new ArrayList<>(); - } - children.add(child); - return true; - } - } - - /** - * This helper notifies any export notification listeners, and propagates resolution to children that depend on - * this export. - * - * @param state the new state for this export - */ - private synchronized void setState(final ExportNotification.State state) { - if ((this.state == ExportNotification.State.EXPORTED && isNonExport()) - || isExportStateTerminal(this.state)) { - throw new IllegalStateException("cannot change state if export is already in terminal state"); - } - if (this.state != ExportNotification.State.UNKNOWN && this.state.getNumber() >= state.getNumber()) { - throw new IllegalStateException("export object state changes must advance toward a terminal state"); - } - this.state = state; - - // Send an export notification before possibly notifying children of our state change. - if (exportId != NON_EXPORT_ID) { - log.debug().append(session.logPrefix).append("export '").append(logIdentity) - .append("' is ExportState.").append(state.name()).endl(); - - final ExportNotification notification = makeExportNotification(); - exportListenerVersion = session.exportListenerVersion; - session.exportListeners.forEach(listener -> listener.notify(notification)); - } else { - log.debug().append(session == null ? "Session " : session.logPrefix) - .append("non-export '").append(logIdentity).append("' is ExportState.") - .append(state.name()).endl(); - } - - if (isExportStateFailure(state) && errorHandler != null) { - maybeAssignErrorId(); - try { - final Exception toReport; - if (caughtException != null && errorTransformer != null) { - toReport = errorTransformer.transform(caughtException); - } else { - toReport = caughtException; - } - - errorHandler.onError(state, errorId, toReport, failedDependencyLogIdentity); - } catch (final Throwable err) { - // this is a serious error; crash the jvm to ensure that we don't miss it - log.error().append("Unexpected error while reporting ExportObject failure: ").append(err).endl(); - ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync( - "Unexpected error while reporting ExportObject failure", err); - } - } - - final boolean isNowExported = state == ExportNotification.State.EXPORTED; - if (isNowExported && successHandler != null) { - try { - successHandler.accept(result); - } catch (final Throwable err) { - // this is a serious error; crash the jvm to ensure that we don't miss it - log.error().append("Unexpected error while reporting ExportObject success: ").append(err).endl(); - ProcessEnvironment.getGlobalFatalErrorReporter().reportAsync( - "Unexpected error while reporting ExportObject success", err); - } - } - - if (isNowExported || isExportStateTerminal(state)) { - children.forEach(child -> child.onResolveOne(this)); - children = Collections.emptyList(); - parents.stream().filter(Objects::nonNull).forEach(this::tryUnmanage); - parents = Collections.emptyList(); - exportMain = null; - errorHandler = null; - successHandler = null; - } - - if ((isNowExported && isNonExport()) || isExportStateTerminal(state)) { - dropReference(); - } - } - - /** - * Decrements parent counter and kicks off the export if that was the last dependency. - * - * @param parent the parent that just resolved; it may have failed - */ - private void onResolveOne(@Nullable final ExportObject parent) { - // am I already cancelled or failed? - if (isExportStateTerminal(state)) { - return; - } - - // Is this a cascading failure? Note that we manage the parents in `setDependencies` which - // keeps the parent results live until this child been exported. This means that the parent is allowed to - // be in a RELEASED state, but is not allowed to be in a failure state. - if (parent != null && isExportStateFailure(parent.state)) { - onDependencyFailure(parent); - return; - } - - final int newDepCount = DEPENDENT_COUNT_UPDATER.decrementAndGet(this); - if (newDepCount > 0) { - return; // either more dependencies to wait for or this export has already failed - } - Assert.eqZero(newDepCount, "newDepCount"); - - scheduleExport(); - } - - /** - * Schedules the export to be performed; assumes all dependencies have been resolved. - */ - private void scheduleExport() { - synchronized (this) { - if (state != ExportNotification.State.PENDING && state != ExportNotification.State.PUBLISHING) { - return; - } - setState(ExportNotification.State.QUEUED); - } - - if (requiresSerialQueue) { - session.scheduler.runSerially(this::doExport); - } else { - session.scheduler.runImmediately(this::doExport); - } - } - - /** - * Performs the actual export on a scheduling thread. - */ - private void doExport() { - final Callable capturedExport; - synchronized (this) { - capturedExport = exportMain; - // check for some sort of cancel race with client - if (state != ExportNotification.State.QUEUED - || session.isExpired() - || capturedExport == null - || !tryRetainReference()) { - if (!isExportStateTerminal(state)) { - setState(ExportNotification.State.CANCELLED); - } else if (errorHandler != null) { - // noinspection ThrowableNotThrown - Assert.statementNeverExecuted("in terminal state but error handler is not null"); - } - return; - } - dropReference(); - setState(ExportNotification.State.RUNNING); - } - - T localResult = null; - boolean shouldLog = false; - final QueryPerformanceRecorder exportRecorder; - try (final SafeCloseable ignored1 = session.executionContext.open(); - final SafeCloseable ignored2 = LivenessScopeStack.open()) { - - final String queryId; - if (isNonExport()) { - queryId = "nonExport=" + logIdentity; - } else { - queryId = "exportId=" + logIdentity; - } - - final boolean isResume = queryPerformanceRecorder != null - && queryPerformanceRecorder.getState() == QueryState.SUSPENDED; - exportRecorder = Objects.requireNonNullElseGet(queryPerformanceRecorder, - () -> QueryPerformanceRecorder.newQuery("ExportObject#doWork(" + queryId + ")", - session.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY)); - - try (final SafeCloseable ignored3 = isResume - ? exportRecorder.resumeQuery() - : exportRecorder.startQuery()) { - try { - localResult = capturedExport.call(); - } catch (final Exception err) { - caughtException = err; - } - shouldLog = exportRecorder.endQuery(); - } catch (final Exception err) { - // end query will throw if the export runner left the QPR in a bad state - if (caughtException == null) { - caughtException = err; - } - } - - if (caughtException != null) { - synchronized (this) { - if (!isExportStateTerminal(state)) { - maybeAssignErrorId(); - if (!(caughtException instanceof StatusRuntimeException)) { - log.error().append("Internal Error '").append(errorId).append("' ") - .append(caughtException).endl(); - } - setState(ExportNotification.State.FAILED); - } - } - } - if (shouldLog || caughtException != null) { - EngineMetrics.getInstance().logQueryProcessingResults(exportRecorder, caughtException); - } - if (caughtException == null) { - // must set result after ending the query so that onSuccess may resume / finalize a parent query - setResult(localResult); - } - } - } - - private void maybeAssignErrorId() { - if (errorId == null) { - errorId = UuidCreator.toString(UuidCreator.getRandomBased()); - } - } - - private synchronized void onDependencyFailure(final ExportObject parent) { - errorId = parent.errorId; - if (parent.caughtException instanceof StatusRuntimeException) { - caughtException = parent.caughtException; - } - ExportNotification.State terminalState = ExportNotification.State.DEPENDENCY_FAILED; - - if (errorId == null) { - final String errorDetails; - switch (parent.state) { - case RELEASED: - terminalState = ExportNotification.State.DEPENDENCY_RELEASED; - errorDetails = "dependency released by user."; - break; - case CANCELLED: - terminalState = ExportNotification.State.DEPENDENCY_CANCELLED; - errorDetails = "dependency cancelled by user."; - break; - default: - // Note: the other error states should have non-null errorId - errorDetails = "dependency does not have its own error defined " + - "and is in an unexpected state: " + parent.state; - break; - } - - maybeAssignErrorId(); - failedDependencyLogIdentity = parent.logIdentity; - if (!(caughtException instanceof StatusRuntimeException)) { - log.error().append("Internal Error '").append(errorId).append("' ").append(errorDetails) - .endl(); - } - } - - setState(terminalState); - } - - /** - * Sets the final result for this export. - * - * @param result the export object - */ - private void setResult(final T result) { - if (this.result != null) { - throw new IllegalStateException("cannot setResult twice!"); - } - - // result is cleared on destroy; so don't set if it won't be called - if (!tryRetainReference()) { - return; - } - - try { - synchronized (this) { - // client may race a cancel with setResult - if (!isExportStateTerminal(state)) { - this.result = result; - if (result instanceof LivenessReferent && DynamicNode.notDynamicOrIsRefreshing(result)) { - manage((LivenessReferent) result); - } - setState(ExportNotification.State.EXPORTED); - } - } - } finally { - dropReference(); - } - } - - /** - * Releases this export; it will wait for the work to complete before releasing. - */ - public synchronized void release() { - if (session == null) { - throw new UnsupportedOperationException("Session-less exports cannot be released"); - } - if (state == ExportNotification.State.EXPORTED) { - if (isNonExport()) { - return; - } - setState(ExportNotification.State.RELEASED); - } else if (!isExportStateTerminal(state)) { - session.nonExport().require(this).submit(this::release); - } - } - - /** - * Releases this export; it will cancel the work and dependent exports proactively when possible. - */ - public synchronized void cancel() { - if (session == null) { - throw new UnsupportedOperationException("Session-less exports cannot be cancelled"); - } - if (state == ExportNotification.State.EXPORTED) { - if (isNonExport()) { - return; - } - setState(ExportNotification.State.RELEASED); - } else if (!isExportStateTerminal(state)) { - setState(ExportNotification.State.CANCELLED); - } - } - - @Override - protected synchronized void destroy() { - super.destroy(); - result = null; - // keep SREs since error propagation won't reference a real errorId on the server - if (!(caughtException instanceof StatusRuntimeException)) { - caughtException = null; - } - } - - /** - * @return an export notification representing current state - */ - private synchronized ExportNotification makeExportNotification() { - final ExportNotification.Builder builder = ExportNotification.newBuilder() - .setTicket(ExportTicketHelper.wrapExportIdInTicket(exportId)) - .setExportState(state); - - if (errorId != null) { - builder.setContext(errorId); - } - if (failedDependencyLogIdentity != null) { - builder.setDependentHandle(failedDependencyLogIdentity); - } - - return builder.build(); - } - } - - public void addExportListener(final StreamObserver observer) { - final int versionId; - final ExportListener listener; - synchronized (exportListeners) { - if (isExpired()) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - - listener = new ExportListener(observer); - exportListeners.add(listener); - versionId = ++exportListenerVersion; - } - - listener.initialize(versionId); - } - - /** - * Remove an on-close callback bound to the life of the session. - * - * @param observer the observer to no longer be subscribed - * @return The item if it was removed, else null - */ - public StreamObserver removeExportListener(final StreamObserver observer) { - final MutableObject wrappedListener = new MutableObject<>(); - final boolean found = exportListeners.removeIf(wrap -> { - if (wrappedListener.getValue() != null) { - return false; - } - - final boolean matches = wrap.listener == observer; - if (matches) { - wrappedListener.setValue(wrap); - } - return matches; - }); - - if (found) { - wrappedListener.getValue().onRemove(); - } - - return found ? observer : null; - } - - @VisibleForTesting - public long numExportListeners() { - return exportListeners.size(); - } - - private class ExportListener { - private volatile boolean isClosed = false; - - private final StreamObserver listener; - - private ExportListener(final StreamObserver listener) { - this.listener = listener; - } - - /** - * Propagate the change to the listener. - * - * @param notification the notification to send - */ - public void notify(final ExportNotification notification) { - if (isClosed) { - return; - } - - try (final SafeCloseable ignored = LivenessScopeStack.open()) { - synchronized (listener) { - listener.onNext(notification); - } - } catch (final RuntimeException e) { - log.error().append("Failed to notify listener: ").append(e).endl(); - removeExportListener(listener); - } - } - - /** - * Perform the run and send initial export state to the listener. - */ - private void initialize(final int versionId) { - final String id = Integer.toHexString(System.identityHashCode(this)); - log.debug().append(logPrefix).append("refreshing listener ").append(id).endl(); - - for (final ExportObject export : exportMap) { - if (!export.tryRetainReference()) { - continue; - } - - try { - if (export.exportListenerVersion >= versionId) { - continue; - } - - // the export cannot change state while we are synchronized on it - // noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (export) { - // check again because of race to the lock - if (export.exportListenerVersion >= versionId) { - continue; - } - - // no need to notify on exports that can no longer be accessed - if (isExportStateTerminal(export.getState())) { - continue; - } - - notify(export.makeExportNotification()); - } - } finally { - export.dropReference(); - } - } - - // notify that the run has completed - notify(ExportNotification.newBuilder() - .setTicket(ExportTicketHelper.wrapExportIdInTicket(NON_EXPORT_ID)) - .setExportState(ExportNotification.State.EXPORTED) - .setContext("run is complete") - .build()); - log.debug().append(logPrefix).append("run complete for listener ").append(id).endl(); - } - - protected void onRemove() { - synchronized (this) { - if (isClosed) { - return; - } - isClosed = true; - } - - safelyComplete(listener); - } - } - - @FunctionalInterface - public interface ExportErrorHandler { - /** - * Notify the handler that the final state of this export failed. - * - * @param resultState the final state of the export - * @param errorContext an identifier to locate the details as to why the export failed - * @param dependentExportId an identifier for the export id of the dependent that caused the failure if - * applicable - */ - void onError(final ExportNotification.State resultState, - final String errorContext, - @Nullable final Exception cause, - @Nullable final String dependentExportId); - } - @FunctionalInterface - public interface ExportErrorGrpcHandler { - /** - * This error handler receives a grpc friendly {@link StatusRuntimeException} that can be directly sent to - * {@link StreamObserver#onError}. - * - * @param notification the notification to forward to the grpc client - */ - void onError(final StatusRuntimeException notification); - } - - public class ExportBuilder { - private final int exportId; - private final ExportObject export; - - private boolean requiresSerialQueue; - private ExportErrorHandler errorHandler; - private Consumer successHandler; - - ExportBuilder(final int exportId) { - this.exportId = exportId; - - if (exportId == NON_EXPORT_ID) { - this.export = new ExportObject<>(SessionState.this.errorTransformer, SessionState.this, NON_EXPORT_ID); - } else { - // noinspection unchecked - this.export = (ExportObject) exportMap.putIfAbsent(exportId, EXPORT_OBJECT_VALUE_FACTORY); - } - } - - /** - * Set the performance recorder to resume when running this export. - * - * @param queryPerformanceRecorder the performance recorder - * @return this builder - */ - public ExportBuilder queryPerformanceRecorder( - @NotNull final QueryPerformanceRecorder queryPerformanceRecorder) { - export.setQueryPerformanceRecorder(queryPerformanceRecorder); - return this; - } - - /** - * Some exports must happen serially w.r.t. other exports. For example, an export that acquires the exclusive - * UGP lock. We enqueue these dependencies independently of the otherwise regularly concurrent exports. - * - * @return this builder - */ - public ExportBuilder requiresSerialQueue() { - requiresSerialQueue = true; - return this; - } - - /** - * Invoke this method to set the required dependencies for this export. A parent may be null to simplify usage - * of optional export dependencies. - * - * @param dependencies the parent dependencies - * @return this builder - */ - public ExportBuilder require(final ExportObject... dependencies) { - export.setDependencies(List.of(dependencies)); - return this; - } - - /** - * Invoke this method to set the required dependencies for this export. A parent may be null to simplify usage - * of optional export dependencies. - * - * @param dependencies the parent dependencies - * @return this builder - */ - public ExportBuilder require(final List> dependencies) { - export.setDependencies(List.copyOf(dependencies)); - return this; - } - - /** - * Invoke this method to set the error handler to be notified if this export fails. Only one error handler may - * be set. Exactly one of the onError and onSuccess handlers will be invoked. - *

- * Not synchronized, it is expected that the provided callback handles thread safety itself. - * - * @param errorHandler the error handler to be notified - * @return this builder - */ - public ExportBuilder onError(final ExportErrorHandler errorHandler) { - if (this.errorHandler != null) { - throw new IllegalStateException("error handler already set"); - } else if (export.hasHadWorkSet) { - throw new IllegalStateException("error handler must be set before work is submitted"); - } - this.errorHandler = errorHandler; - return this; - } - - /** - * Invoke this method to set the error handler to be notified if this export fails. Only one error handler may - * be set. Exactly one of the onError and onSuccess handlers will be invoked. - *

- * Not synchronized, it is expected that the provided callback handles thread safety itself. - * - * @param errorHandler the error handler to be notified - * @return this builder - */ - public ExportBuilder onErrorHandler(final ExportErrorGrpcHandler errorHandler) { - return onError(((resultState, errorContext, cause, dependentExportId) -> { - if (cause instanceof StatusRuntimeException) { - errorHandler.onError((StatusRuntimeException) cause); - return; - } - - final String dependentStr = dependentExportId == null ? "" - : (" (related parent export id: " + dependentExportId + ")"); - if (cause == null) { - if (resultState == ExportNotification.State.CANCELLED) { - errorHandler.onError(Exceptions.statusRuntimeException(Code.CANCELLED, - "Export is cancelled" + dependentStr)); - } else { - errorHandler.onError(Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, - "Export in state " + resultState + dependentStr)); - } - } else { - errorHandler.onError(Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, - "Details Logged w/ID '" + errorContext + "'" + dependentStr)); - } - })); - } - - /** - * Invoke this method to set the error handler to be notified if this export fails. Only one error handler may - * be set. This is a convenience method for use with {@link StreamObserver}. Exactly one of the onError and - * onSuccess handlers will be invoked. - *

- * Invoking onError will be synchronized on the StreamObserver instance, so callers can rely on that mechanism - * to deal with more than one thread trying to write to the stream. - * - * @param streamObserver the streamObserver to be notified of any error - * @return this builder - */ - public ExportBuilder onError(StreamObserver streamObserver) { - return onErrorHandler(statusRuntimeException -> { - safelyError(streamObserver, statusRuntimeException); - }); - } - - /** - * Invoke this method to set the onSuccess handler to be notified if this export succeeds. Only one success - * handler may be set. Exactly one of the onError and onSuccess handlers will be invoked. - *

- * Not synchronized, it is expected that the provided callback handles thread safety itself. - * - * @param successHandler the onSuccess handler to be notified - * @return this builder - */ - public ExportBuilder onSuccess(final Consumer successHandler) { - if (this.successHandler != null) { - throw new IllegalStateException("success handler already set"); - } else if (export.hasHadWorkSet) { - throw new IllegalStateException("success handler must be set before work is submitted"); - } - this.successHandler = successHandler; - return this; - } - - /** - * Invoke this method to set the onSuccess handler to be notified if this export succeeds. Only one success - * handler may be set. Exactly one of the onError and onSuccess handlers will be invoked. - *

- * Not synchronized, it is expected that the provided callback handles thread safety itself. - * - * @param successHandler the onSuccess handler to be notified - * @return this builder - */ - public ExportBuilder onSuccess(final Runnable successHandler) { - return onSuccess(ignored -> successHandler.run()); - } - - /** - * This method is the final method for submitting an export to the session. The provided callable is enqueued on - * the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are - * guaranteed to be resolved when the exportMain is executing. - *

- * Warning! It is the SessionState owner's responsibility to wait to release any dependency until after this - * exportMain callable/runnable has complete. - * - * @param exportMain the callable that generates the export - * @return the submitted export object - */ - public ExportObject submit(final Callable exportMain) { - export.setWork(exportMain, errorHandler, successHandler, requiresSerialQueue); - return export; - } - - /** - * This method is the final method for submitting an export to the session. The provided runnable is enqueued on - * the scheduler when all dependencies have been satisfied. Only the dependencies supplied to the builder are - * guaranteed to be resolved when the exportMain is executing. - *

- * Warning! It is the SessionState owner's responsibility to wait to release any dependency until after this - * exportMain callable/runnable has complete. - * - * @param exportMain the runnable to execute once dependencies have resolved - * @return the submitted export object - */ - public ExportObject submit(final Runnable exportMain) { - return submit(() -> { - exportMain.run(); - return null; - }); - } - - /** - * @return the export object that this builder is building - */ - public ExportObject getExport() { - return export; - } - - /** - * @return the export id of this export or {@link SessionState#NON_EXPORT_ID} if is a non-export - */ - public int getExportId() { - return exportId; - } - } - - private static final KeyedIntObjectKey> EXPORT_OBJECT_ID_KEY = - new KeyedIntObjectKey.BasicStrict>() { - @Override - public int getIntKey(final ExportObject exportObject) { - return exportObject.exportId; - } - }; - - private final KeyedIntObjectHash.ValueFactory> EXPORT_OBJECT_VALUE_FACTORY = - new KeyedIntObjectHash.ValueFactory.Strict>() { - @Override - public ExportObject newValue(final int key) { - if (isExpired()) { - throw Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "session has expired"); - } - - return new ExportObject<>(SessionState.this.errorTransformer, SessionState.this, key); - } - }; -}