Skip to content

Commit

Permalink
Merge commit '1bf9563' into add_cache_read_on_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed May 3, 2024
2 parents 0d1bce4 + 1bf9563 commit 145bb38
Showing 1 changed file with 0 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,15 @@
import org.elasticsearch.common.Randomness;
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.aggregation.RateLongAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorTests;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.mapper.BlockDocValuesReader;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.SourceLoader;
import org.hamcrest.Matcher;
import org.junit.After;

Expand Down Expand Up @@ -248,163 +243,4 @@ record Doc(String pod, long timestamp, long requests) {
}

record Group(String tsidHash, long timestampInterval) {}

// TODO: in a follow up add support for ordinal based time series grouping operator
// (and then remove this test)
// (ordinal based can only group by one field and never includes timestamp)
public void testBasicRateOrdinalBased() {
long[] v1 = { 1, 1, 3, 0, 2, 9, 21, 3, 7, 7, 9, 12 };
long[] t1 = { 1, 5, 11, 20, 21, 59, 88, 91, 92, 97, 99, 112 };

long[] v2 = { 7, 2, 0, 11, 24, 0, 4, 1, 10, 2 };
long[] t2 = { 1, 2, 4, 5, 6, 8, 10, 11, 12, 14 };

long[] v3 = { 0, 1, 0, 1, 1, 4, 2, 2, 2, 2, 3, 5, 5 };
long[] t3 = { 2, 3, 5, 7, 8, 9, 10, 12, 14, 15, 18, 20, 22 };
List<Pod> pods = List.of(new Pod("p1", t1, v1), new Pod("p2", t2, v2), new Pod("p3", t3, v3));
long unit = between(1, 5);
Map<String, Double> actualRates = runRateTestOrdinalBased(pods, TimeValue.timeValueMillis(unit));
assertThat(actualRates, equalTo(Map.of("p1", 35.0 * unit / 111.0, "p2", 42.0 * unit / 13.0, "p3", 10.0 * unit / 20.0)));
}

// TODO: in a follow up add support for ordinal based time series grouping operator
// (and then remove this test)
// (ordinal based can only group by one field and never includes timestamp)
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/107568")
public void testRandomRateOrdinalBased() {
int numPods = between(1, 10);
List<Pod> pods = new ArrayList<>();
Map<String, Double> expectedRates = new HashMap<>();
TimeValue unit = TimeValue.timeValueSeconds(1);
for (int p = 0; p < numPods; p++) {
int numValues = between(2, 100);
long[] values = new long[numValues];
long[] times = new long[numValues];
long t = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
for (int i = 0; i < numValues; i++) {
values[i] = randomIntBetween(0, 100);
t += TimeValue.timeValueSeconds(between(1, 10)).millis();
times[i] = t;
}
Pod pod = new Pod("p" + p, times, values);
pods.add(pod);
if (numValues == 1) {
expectedRates.put(pod.name, null);
} else {
expectedRates.put(pod.name, pod.expectedRate(unit));
}
}
Map<String, Double> actualRates = runRateTestOrdinalBased(pods, unit);
assertThat(actualRates, equalTo(expectedRates));
}

Map<String, Double> runRateTestOrdinalBased(List<Pod> pods, TimeValue unit) {
long unitInMillis = unit.millis();
record Doc(String pod, long timestamp, long requests) {

}
var sourceOperatorFactory = createTimeSeriesSourceOperator(
directory,
r -> this.reader = r,
Integer.MAX_VALUE,
between(1, 100),
randomBoolean(),
TimeValue.ZERO,
writer -> {
List<Doc> docs = new ArrayList<>();
for (Pod pod : pods) {
for (int i = 0; i < pod.times.length; i++) {
docs.add(new Doc(pod.name, pod.times[i], pod.values[i]));
}
}
Randomness.shuffle(docs);
for (Doc doc : docs) {
writeTS(writer, doc.timestamp, new Object[] { "pod", doc.pod }, new Object[] { "requests", doc.requests });
}
return docs.size();
}
);
var ctx = driverContext();
HashAggregationOperator finalHash = new HashAggregationOperator(
List.of(new RateLongAggregatorFunctionSupplier(List.of(1, 2, 3), unitInMillis).groupingAggregatorFactory(AggregatorMode.FINAL)),
() -> BlockHash.build(
List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF)),
ctx.blockFactory(),
randomIntBetween(1, 1000),
randomBoolean()
),
ctx
);
List<Page> results = new ArrayList<>();
var requestsField = new NumberFieldMapper.NumberFieldType("requests", NumberFieldMapper.NumberType.LONG);
var podField = new KeywordFieldMapper.KeywordFieldType("pod");
if (randomBoolean()) {
HashAggregationOperator initialHash = new HashAggregationOperator(
List.of(
new RateLongAggregatorFunctionSupplier(List.of(5, 2), unitInMillis).groupingAggregatorFactory(AggregatorMode.INITIAL)
),
() -> BlockHash.build(
List.of(new BlockHash.GroupSpec(4, ElementType.BYTES_REF)),
ctx.blockFactory(),
randomIntBetween(1, 1000),
randomBoolean()
),
ctx
);
OperatorTestCase.runDriver(
new Driver(
ctx,
sourceOperatorFactory.get(ctx),
List.of(
ValuesSourceReaderOperatorTests.factory(reader, podField, ElementType.BYTES_REF).get(ctx),
ValuesSourceReaderOperatorTests.factory(reader, requestsField, ElementType.LONG).get(ctx),
initialHash,
finalHash
),
new TestResultPageSinkOperator(results::add),
() -> {}
)
);
} else {
var blockLoader = new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader("pod");
var shardContext = new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE);
var ordinalGrouping = new OrdinalsGroupingOperator(
shardIdx -> blockLoader,
List.of(shardContext),
ElementType.BYTES_REF,
0,
"pod",
List.of(
new RateLongAggregatorFunctionSupplier(List.of(4, 2), unitInMillis).groupingAggregatorFactory(AggregatorMode.INITIAL)
),
randomIntBetween(1, 1000),
ctx
);
OperatorTestCase.runDriver(
new Driver(
ctx,
sourceOperatorFactory.get(ctx),
List.of(
ValuesSourceReaderOperatorTests.factory(reader, requestsField, ElementType.LONG).get(ctx),
ordinalGrouping,
finalHash
),
new TestResultPageSinkOperator(results::add),
() -> {}
)
);
}
Map<String, Double> rates = new HashMap<>();
for (Page result : results) {
BytesRefBlock keysBlock = result.getBlock(0);
DoubleBlock ratesBlock = result.getBlock(1);
for (int i = 0; i < result.getPositionCount(); i++) {
var key = keysBlock.getBytesRef(i, new BytesRef()).utf8ToString();
rates.put(key, ratesBlock.getDouble(i));
}
result.releaseBlocks();
}
return rates;
}

}

0 comments on commit 145bb38

Please sign in to comment.