Skip to content

Commit

Permalink
[SPARK-51047][SS][TESTS] Add tests to verify scan ordering for non-ze…
Browse files Browse the repository at this point in the history
…ro start ordinals as well as non-ascending ordinals

### What changes were proposed in this pull request?
Add tests to verify scan ordering for non-zero start ordinals as well as non-ascending ordinals

### Why are the changes needed?
Improve coverage in this area

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Test only change

```
[info] Run completed in 8 minutes, 33 seconds.
[info] Total number of tests run: 1032
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1032, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49747 from anishshri-db/task/SPARK-51047.

Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Feb 1, 2025
1 parent 23704d5 commit 5cd25fc
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ class UnsafeRowDataEncoder(
writer.resetRowWriter()
rangeScanKeyFieldsWithOrdinal.zipWithIndex.foreach { case (fieldWithOrdinal, idx) =>
val field = fieldWithOrdinal._1
// We must use idx here since we are already operating on the prefix which
// already has the relevant range ordinals projected to the front.
val value = row.get(idx, field.dataType)
// Note that we cannot allocate a smaller buffer here even if the value is null
// because the effective byte array is considered variable size and needs to have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,63 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
}
}

Seq(true, false).foreach { colFamiliesEnabled =>
test(s"rocksdb range scan - variable size non-ordering columns with non-zero start ordinal " +
s"with colFamiliesEnabled=$colFamiliesEnabled") {

tryWithProviderResource(newStoreProvider(keySchema,
RangeKeyScanStateEncoderSpec(
keySchema, Seq(1)), colFamiliesEnabled)) { provider =>

def getRandStr(): String = Random.alphanumeric.filter(_.isLetter)
.take(Random.nextInt() % 10 + 1).mkString

val store = provider.getStore(0)

// use non-default col family if column families are enabled
val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
if (colFamiliesEnabled) {
store.createColFamilyIfAbsent(cfName,
keySchema, valueSchema,
RangeKeyScanStateEncoderSpec(keySchema, Seq(1)))
}

val timerTimestamps = Seq(931, 8000, 452300, 4200, -1, 90, 1, 2, 8,
-230, -14569, -92, -7434253, 35, 6, 9, -323, 5)
timerTimestamps.foreach { ts =>
val keyRow = dataToKeyRow(getRandStr(), ts)
val valueRow = dataToValueRow(1)
store.put(keyRow, valueRow, cfName)
assert(valueRowToData(store.get(keyRow, cfName)) === 1)
}

val result = store.iterator(cfName).map { kv =>
val key = keyRowToData(kv.key)
key._2
}.toSeq
assert(result === timerTimestamps.sorted)
store.commit()

// test with a different set of power of 2 timestamps
val store1 = provider.getStore(1)
val timerTimestamps1 = Seq(-32, -64, -256, 64, 32, 1024, 4096, 0)
timerTimestamps1.foreach { ts =>
val keyRow = dataToKeyRow(getRandStr(), ts)
val valueRow = dataToValueRow(1)
store1.put(keyRow, valueRow, cfName)
assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
}

val result1 = store1.iterator(cfName).map { kv =>
val key = keyRowToData(kv.key)
key._2
}.toSeq
assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
store1.commit()
}
}
}

testWithColumnFamiliesAndEncodingTypes(
"rocksdb range scan - variable size non-ordering columns with " +
"double type values are supported",
Expand Down Expand Up @@ -453,6 +510,67 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
}
}

Seq(true, false).foreach { colFamiliesEnabled =>
Seq(Seq(1, 2), Seq(2, 1)).foreach { sortIndexes =>
test(s"rocksdb range scan multiple ordering columns - with non-zero start ordinal - " +
s"variable size non-ordering columns with colFamiliesEnabled=$colFamiliesEnabled " +
s"sortIndexes=${sortIndexes.mkString(",")}") {

val testSchema: StructType = StructType(
Seq(StructField("key1", StringType, false),
StructField("key2", LongType, false),
StructField("key3", IntegerType, false)))

val schemaProj = UnsafeProjection.create(Array[DataType](StringType, LongType, IntegerType))

tryWithProviderResource(newStoreProvider(testSchema,
RangeKeyScanStateEncoderSpec(testSchema, sortIndexes), colFamiliesEnabled)) { provider =>
val store = provider.getStore(0)

val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
if (colFamiliesEnabled) {
store.createColFamilyIfAbsent(cfName,
testSchema, valueSchema,
RangeKeyScanStateEncoderSpec(testSchema, sortIndexes))
}

val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 68), (90L, 2000),
(1L, 27), (1L, 394), (1L, 5), (3L, 980),
(-1L, 232), (-1L, 3455), (-6109L, 921455), (-9808344L, 1), (-1020L, 2),
(35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210), (-4344L, 2323), (-3122L, 323))
timerTimestamps.foreach { ts =>
// order by long col first and then by int col
val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](UTF8String
.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), ts._1,
ts._2)))
val valueRow = dataToValueRow(1)
store.put(keyRow, valueRow, cfName)
assert(valueRowToData(store.get(keyRow, cfName)) === 1)
}

val result = store.iterator(cfName).map { kv =>
val keyRow = kv.key
(keyRow.getLong(1), keyRow.getInt(2))
}.toSeq

def getOrderedTs(
orderedInput: Seq[(Long, Int)],
sortIndexes: Seq[Int]): Seq[(Long, Int)] = {
sortIndexes match {
case Seq(1, 2) => orderedInput.sortBy(x => (x._1, x._2))
case Seq(2, 1) => orderedInput.sortBy(x => (x._2, x._1))
case _ => throw new IllegalArgumentException(s"Invalid sortIndexes: " +
s"${sortIndexes.mkString(",")}")
}
}

assert(result === getOrderedTs(timerTimestamps, sortIndexes))
store.commit()
}
}
}
}

testWithColumnFamiliesAndEncodingTypes(
"rocksdb range scan multiple ordering columns - variable size " +
s"non-ordering columns",
Expand Down Expand Up @@ -1065,97 +1183,113 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
assert(exception.getMessage.contains("Found long, expecting union"))
}

testWithColumnFamiliesAndEncodingTypes(
"rocksdb range scan multiple non-contiguous ordering columns",
TestWithBothChangelogCheckpointingEnabledAndDisabled ) { colFamiliesEnabled =>
val testSchema: StructType = StructType(
Seq(
StructField("ordering1", LongType, false),
StructField("key2", StringType, false),
StructField("ordering2", IntegerType, false),
StructField("string2", StringType, false),
StructField("ordering3", DoubleType, false)
Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach { sortIndexes =>
testWithColumnFamiliesAndEncodingTypes(
s"rocksdb range scan multiple non-contiguous ordering columns " +
s"and sortIndexes=${sortIndexes.mkString(",")}",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
val testSchema: StructType = StructType(
Seq(
StructField("ordering1", LongType, false),
StructField("key2", StringType, false),
StructField("ordering2", IntegerType, false),
StructField("string2", StringType, false),
StructField("ordering3", DoubleType, false)
)
)
)

val testSchemaProj = UnsafeProjection.create(Array[DataType](
val testSchemaProj = UnsafeProjection.create(Array[DataType](
immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): _*))
val rangeScanOrdinals = Seq(0, 2, 4)

tryWithProviderResource(
newStoreProvider(
testSchema,
RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
colFamiliesEnabled
)
) { provider =>
val store = provider.getStore(0)
// Multiply by 2 to get the actual ordinals in the row
val rangeScanOrdinals = sortIndexes.map(_ * 2)

val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
if (colFamiliesEnabled) {
store.createColFamilyIfAbsent(
cfName,
tryWithProviderResource(
newStoreProvider(
testSchema,
valueSchema,
RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
colFamiliesEnabled
)
}
) { provider =>
val store = provider.getStore(0)

val orderedInput = Seq(
// Make sure that the first column takes precedence, even if the
// later columns are greater
(-2L, 0, 99.0),
(-1L, 0, 98.0),
(0L, 0, 97.0),
(2L, 0, 96.0),
// Make sure that the second column takes precedence, when the first
// column is all the same
(3L, -2, -1.0),
(3L, -1, -2.0),
(3L, 0, -3.0),
(3L, 2, -4.0),
// Finally, make sure that the third column takes precedence, when the
// first two ordering columns are the same.
(4L, -1, -127.0),
(4L, -1, 0.0),
(4L, -1, 64.0),
(4L, -1, 127.0)
)
val scrambledInput = Random.shuffle(orderedInput)

scrambledInput.foreach { record =>
val keyRow = testSchemaProj.apply(
new GenericInternalRow(
Array[Any](
record._1,
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString),
record._2,
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString),
record._3
)
val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
if (colFamiliesEnabled) {
store.createColFamilyIfAbsent(
cfName,
testSchema,
valueSchema,
RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
)
}

val orderedInput = Seq(
// Make sure that the first column takes precedence, even if the
// later columns are greater
(-2L, 0, 99.0),
(-1L, 0, 98.0),
(0L, 0, 97.0),
(2L, 0, 96.0),
// Make sure that the second column takes precedence, when the first
// column is all the same
(3L, -2, -1.0),
(3L, -1, -2.0),
(3L, 0, -3.0),
(3L, 2, -4.0),
// Finally, make sure that the third column takes precedence, when the
// first two ordering columns are the same.
(4L, -1, -127.0),
(4L, -1, 0.0),
(4L, -1, 64.0),
(4L, -1, 127.0)
)
val scrambledInput = Random.shuffle(orderedInput)

scrambledInput.foreach { record =>
val keyRow = testSchemaProj.apply(
new GenericInternalRow(
Array[Any](
record._1,
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString),
record._2,
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString),
record._3
)
)
)

// The value is just a "dummy" value of 1
val valueRow = dataToValueRow(1)
store.put(keyRow, valueRow, cfName)
assert(valueRowToData(store.get(keyRow, cfName)) === 1)
}
// The value is just a "dummy" value of 1
val valueRow = dataToValueRow(1)
store.put(keyRow, valueRow, cfName)
assert(valueRowToData(store.get(keyRow, cfName)) === 1)
}

val result = store
.iterator(cfName)
.map { kv =>
val keyRow = kv.key
val key = (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4))
(key._1, key._2, key._3)
val result = store
.iterator(cfName)
.map { kv =>
val keyRow = kv.key
(keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4))
}
.toSeq

def getOrderedInput(
orderedInput: Seq[(Long, Int, Double)],
sortIndexes: Seq[Int]): Seq[(Long, Int, Double)] = {
sortIndexes match {
case Seq(0, 1, 2) => orderedInput.sortBy(x => (x._1, x._2, x._3))
case Seq(0, 2, 1) => orderedInput.sortBy(x => (x._1, x._3, x._2))
case Seq(2, 1, 0) => orderedInput.sortBy(x => (x._3, x._2, x._1))
case Seq(2, 0, 1) => orderedInput.sortBy(x => (x._3, x._1, x._2))
case _ => throw new IllegalArgumentException(s"Invalid sortIndexes: " +
s"${sortIndexes.mkString(",")}")
}
}
.toSeq

assert(result === orderedInput)
assert(result === getOrderedInput(orderedInput, sortIndexes))
store.commit()
}
}
}


testWithColumnFamiliesAndEncodingTypes(
"rocksdb range scan multiple ordering columns - variable size " +
s"non-ordering columns with null values in first ordering column",
Expand Down

0 comments on commit 5cd25fc

Please sign in to comment.