Skip to content

Commit

Permalink
More tests for range partition parallel indexing (#9232) (#9236)
Browse files Browse the repository at this point in the history
Add more unit tests for range partition native batch parallel indexing.

Also, fix a bug where ParallelIndexPhaseRunner incorrectly thinks that
identical collected DimensionDistributionReports are not equal due to
not overriding equals() in DimensionDistributionReport.
  • Loading branch information
ccaominh authored and jihoonson committed Jan 21, 2020
1 parent e0e6d98 commit c0f4dfb
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 13 deletions.
5 changes: 5 additions & 0 deletions indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.joda.time.Interval;

import java.util.Map;
import java.util.Objects;

public class DimensionDistributionReport implements SubTaskReport
{
Expand Down Expand Up @@ -65,4 +66,24 @@ public String toString()
", intervalToDistribution=" + intervalToDistribution +
'}';
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DimensionDistributionReport that = (DimensionDistributionReport) o;
return Objects.equals(taskId, that.taskId) &&
Objects.equals(intervalToDistribution, that.intervalToDistribution);
}

@Override
public int hashCode()
{
return Objects.hash(taskId, intervalToDistribution);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.io.IOException;
import java.util.Comparator;
import java.util.Objects;

/**
* Counts approximate frequencies of strings.
Expand Down Expand Up @@ -137,6 +138,40 @@ public String toString()
'}';
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StringSketch that = (StringSketch) o;

// ParallelIndexPhaseRunner.collectReport() uses equals() to check subtasks send identical reports if they retry.
// However, ItemsSketch does not override equals(): https://github.com/apache/incubator-datasketches-java/issues/140
//
// Since ItemsSketch has built-in non-determinism, only rely on ItemsSketch properties that are deterministic. This
// check is best-effort as it is possible for it to return true for sketches that contain different values.
return delegate.getK() == that.delegate.getK() &&
delegate.getN() == that.delegate.getN() &&
Objects.equals(delegate.getMaxValue(), that.delegate.getMaxValue()) &&
Objects.equals(delegate.getMinValue(), that.delegate.getMinValue());
}

@Override
public int hashCode()
{
// See comment in equals() regarding ItemsSketch.
return Objects.hash(
delegate.getK(),
delegate.getN(),
delegate.getMaxValue(),
delegate.getMinValue()
);
}

ItemsSketch<String> getDelegate()
{
return delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,17 @@ Set<DataSegment> runTestTask(
Interval interval,
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
) throws Exception
{
final ParallelIndexSupervisorTask task = newTask(
parseSpec,
interval,
inputDir,
filter,
partitionsSpec
partitionsSpec,
maxNumConcurrentSubTasks
);

actionClient = createActionClient(task);
Expand All @@ -137,7 +139,8 @@ private ParallelIndexSupervisorTask newTask(
Interval interval,
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
)
{
GranularitySpec granularitySpec = new UniformGranularitySpec(
Expand All @@ -163,7 +166,7 @@ private ParallelIndexSupervisorTask newTask(
null,
null,
null,
2,
maxNumConcurrentSubTasks,
null,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
import org.apache.druid.java.util.common.Intervals;
Expand Down Expand Up @@ -52,4 +53,12 @@ public void serializesDeserializes()
{
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
}

@Test
public void abidesEqualsContract()
{
EqualsVerifier.forClass(DimensionDistributionReport.class)
.usingGetClass()
.verify();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
false,
0
);
private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;

@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
Expand Down Expand Up @@ -129,7 +130,8 @@ public void testRun() throws Exception
Intervals.of("2017/2018"),
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"))
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
MAX_NUM_CONCURRENT_SUB_TASKS
);
assertHashedPartition(publishedSegments);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,30 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
0
);

@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.SEGMENT, true}
new Object[]{LockGranularity.TIME_CHUNK, false, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 2},
new Object[]{LockGranularity.SEGMENT, true, 2},
new Object[]{LockGranularity.SEGMENT, true, 1} // currently spawns subtask instead of running in supervisor
);
}

private File inputDir;
private SetMultimap<Interval, String> intervalToDim1;

public RangePartitionMultiPhaseParallelIndexingTest(LockGranularity lockGranularity, boolean useInputFormatApi)
private final int maxNumConcurrentSubTasks;

public RangePartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
int maxNumConcurrentSubTasks
)
{
super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
}

@Override
Expand Down Expand Up @@ -169,7 +177,8 @@ public void createsCorrectRangePartitions() throws Exception
null,
DIM1,
false
)
),
maxNumConcurrentSubTasks
);
assertRangePartitions(publishedSegments);
}
Expand Down Expand Up @@ -362,7 +371,6 @@ private TestPartialRangeSegmentGenerateRunner(
}
}


private static class TestPartialGenericSegmentMergeParallelIndexTaskRunner
extends PartialGenericSegmentMergeParallelIndexTaskRunner
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel.distribution;

import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.datasketches.quantiles.ItemsSketch;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -69,6 +70,15 @@ public void serializesDeserializes()
target.put(MAX_STRING);
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
}

@Test
public void abidesEqualsContract()
{
EqualsVerifier.forClass(StringSketch.class)
.usingGetClass()
.withNonnullFields("delegate")
.verify();
}
}

public static class PutTest
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<version>3.1.10</version>
<version>3.1.11</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down

0 comments on commit c0f4dfb

Please sign in to comment.