Skip to content

Commit

Permalink
Fix Aggregate Request Charge (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
southpolesteve authored Jul 9, 2019
1 parent fb1c081 commit bff1cb9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Response } from "../../request";
import { AverageAggregator, CountAggregator, MaxAggregator, MinAggregator, SumAggregator } from "../Aggregators";
import { ExecutionContext } from "../ExecutionContext";
import { getInitialHeader } from "../headerUtils";
import { getInitialHeader, mergeHeaders } from "../headerUtils";
import { CosmosHeaders } from "../index";

/** @hidden */
Expand All @@ -11,6 +11,7 @@ export class AggregateEndpointComponent implements ExecutionContext {
private aggregateValuesIndex: number;
private localAggregators: any[];
private started: boolean;
private respHeaders: CosmosHeaders;

/**
* Represents an endpoint in handling aggregate queries.
Expand All @@ -22,6 +23,7 @@ export class AggregateEndpointComponent implements ExecutionContext {
// TODO: any
this.executionContext = executionContext;
this.localAggregators = [];
this.respHeaders = getInitialHeader();
aggregateOperators.forEach((aggregateOperator: string) => {
switch (aggregateOperator) {
case "Average":
Expand Down Expand Up @@ -83,13 +85,24 @@ export class AggregateEndpointComponent implements ExecutionContext {
const { result: item, headers } = await this.executionContext.nextItem();
if (item === undefined) {
// no more results
return { result: this.toArrayTempResources, headers };
return { result: this.toArrayTempResources, headers: this.getAndResetActiveResponseHeaders() };
}

this.toArrayTempResources = this.toArrayTempResources.concat(item);
this.mergeWithActiveResponseHeaders(headers);
return this._getQueryResults();
}

private mergeWithActiveResponseHeaders(headers: CosmosHeaders) {
mergeHeaders(this.respHeaders, headers);
}

private getAndResetActiveResponseHeaders() {
const ret = this.respHeaders;
this.respHeaders = getInitialHeader();
return ret;
}

/**
* Execute a provided function on the next element in the AggregateEndpointComponent.
* @memberof AggregateEndpointComponent
Expand All @@ -106,7 +119,6 @@ export class AggregateEndpointComponent implements ExecutionContext {
this.aggregateValuesIndex < this.aggregateValues.length
? this.aggregateValues[++this.aggregateValuesIndex]
: undefined;

return { result: resource, headers: resHeaders };
}

Expand Down
23 changes: 18 additions & 5 deletions test/integration/aggregateQuery.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,34 @@ describe("Aggregate Query", function() {
};

const validateToArray = async function(queryIterator: QueryIterator<any>, expectedResults: any) {
const { resources: results } = await queryIterator.fetchAll();
const { resources: results, requestCharge } = await queryIterator.fetchAll();
assert(requestCharge > 0, "request charge was not greater than zero");
assert.equal(results.length, expectedResults.length, "invalid number of results");
assert.equal(queryIterator.hasMoreResults(), false, "hasMoreResults: no more results is left");
return requestCharge;
};

const validateExecuteNextAndHasMoreResults = async function(
queryIterator: QueryIterator<any>,
options: any,
expectedResults: any[]
expectedResults: any[],
fetchAllRequestCharge: number
) {
const pageSize = options["maxItemCount"];
const listOfResultPages: any[] = [];
let totalFetchedResults: any[] = [];
let totalExecuteNextRequestCharge = 0;

while (totalFetchedResults.length <= expectedResults.length) {
const { resources: results } = await queryIterator.fetchNext();
const { resources: results, requestCharge } = await queryIterator.fetchNext();
listOfResultPages.push(results);

if (results === undefined || totalFetchedResults.length === expectedResults.length) {
break;
}

totalFetchedResults = totalFetchedResults.concat(results);
totalExecuteNextRequestCharge += requestCharge;

if (totalFetchedResults.length < expectedResults.length) {
// there are more results
Expand All @@ -96,6 +101,14 @@ describe("Aggregate Query", function() {
// no more results
validateResult(totalFetchedResults, expectedResults);
assert.equal(queryIterator.hasMoreResults(), false, "hasMoreResults: no more results is left");

assert(totalExecuteNextRequestCharge > 0);
const percentDifference =
Math.abs(fetchAllRequestCharge - totalExecuteNextRequestCharge) / totalExecuteNextRequestCharge;
assert(
percentDifference <= 0.01,
"difference between fetchAll request charge and executeNext request charge should be less than 1%"
);
};

const ValidateAsyncIterator = async function(queryIterator: QueryIterator<any>, expectedResults: any[]) {
Expand All @@ -118,9 +131,9 @@ describe("Aggregate Query", function() {
const options: FeedOptions = { maxDegreeOfParallelism: 2, maxItemCount: 1 };

const queryIterator = container.items.query(query, options);
await validateToArray(queryIterator, expectedResults);
const fetchAllRequestCharge = await validateToArray(queryIterator, expectedResults);
queryIterator.reset();
await validateExecuteNextAndHasMoreResults(queryIterator, options, expectedResults);
await validateExecuteNextAndHasMoreResults(queryIterator, options, expectedResults, fetchAllRequestCharge);
queryIterator.reset();
await ValidateAsyncIterator(queryIterator, expectedResults);
};
Expand Down

0 comments on commit bff1cb9

Please sign in to comment.