Skip to content

Commit

Permalink
Added abort signal support for dynamo readonly functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Auke Bruinsma authored and benhutchins committed Oct 13, 2023
1 parent c745667 commit d74b44a
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 40 deletions.
11 changes: 6 additions & 5 deletions src/batch-get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { type Table } from './table'
import { buildProjectionExpression } from './query/projection-expression'
import { HelpfulError } from './errors'
import { type AttributeMap } from './interfaces'
import { type IRequestOptions } from './connections'

export class BatchGet<T extends Table> {
public static readonly MAX_BATCH_ITEMS = 100
Expand Down Expand Up @@ -69,7 +70,7 @@ export class BatchGet<T extends Table> {
return this
}

public async retrieve(): Promise<T[]> {
public async retrieve(requestOptions?: IRequestOptions): Promise<T[]> {
const chunkSize = this.atomicity ? BatchGet.MAX_TRANSACT_ITEMS : BatchGet.MAX_BATCH_ITEMS
return await Promise.all(
chunk(this.items, chunkSize).map(async (chunkedItems) => {
Expand Down Expand Up @@ -117,11 +118,11 @@ export class BatchGet<T extends Table> {
if (this.atomicity) {
output = await this.dynamo.transactGetItems({
TransactItems: transactItems,
})
}, requestOptions)
} else {
output = await this.dynamo.batchGetItem({
RequestItems: requestMap,
})
}, requestOptions)
}
} catch (ex) {
throw new HelpfulError(ex)
Expand Down Expand Up @@ -181,8 +182,8 @@ export class BatchGet<T extends Table> {
})
}

public async retrieveMapped(): Promise<Map<typeof Table, T[]>> {
const items = await this.retrieve()
public async retrieveMapped(requestOptions?: IRequestOptions): Promise<Map<typeof Table, T[]>> {
const items = await this.retrieve(requestOptions)
const map = new Map<typeof Table, T[]>()

for (const item of items) {
Expand Down
1 change: 1 addition & 0 deletions src/connections/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './connection'
export * from './dynamodb-connection'
export * from './request-options'
7 changes: 7 additions & 0 deletions src/connections/request-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export interface IRequestOptions {
abortSignal?: AbortSignal
}

export function isRequestOptions(value: unknown): value is IRequestOptions {
return typeof value === 'object' && value != null && 'abortSignal' in value
}
17 changes: 9 additions & 8 deletions src/query/global-secondary-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { type Filters as QueryFilters } from './filters'
import { QueryOutput } from './output'
import { buildProjectionExpression } from './projection-expression'
import { MagicSearch, type MagicSearchInput } from './search'
import { type IRequestOptions } from '../connections'

interface GlobalSecondaryIndexGetInput {
/**
Expand Down Expand Up @@ -102,7 +103,7 @@ export class GlobalSecondaryIndex<T extends Table> {
*
* Avoid use whenever you do not have uniqueness for the GlobalSecondaryIndex's HASH + RANGE.
*/
public async get(filters: QueryFilters<T>, input: GlobalSecondaryIndexGetInput = {}): Promise<T | undefined> {
public async get(filters: QueryFilters<T>, input: GlobalSecondaryIndexGetInput = {}, requestOptions?: IRequestOptions): Promise<T | undefined> {
if (!has(filters, this.metadata.hash.propertyName)) {
throw new QueryError('Cannot perform .get() on a GlobalSecondaryIndex without specifying a hash key value')
} else if (this.metadata.range != null && !has(filters, this.metadata.range.propertyName)) {
Expand All @@ -117,7 +118,7 @@ export class GlobalSecondaryIndex<T extends Table> {
// DynamoDB will start the search at the first match and limit means it will only process
// that document and return it, however, you cannot use any additional filters on this .get
// method; for that, you need to use .query()
const results = await this.query(filters, input)
const results = await this.query(filters, input, requestOptions)

if (results.count > 0) {
return results[0]
Expand Down Expand Up @@ -177,7 +178,7 @@ export class GlobalSecondaryIndex<T extends Table> {
return queryInput
}

public async query(filters: QueryFilters<T>, input?: GlobalSecondaryIndexQueryInput): Promise<QueryOutput<T>> {
public async query(filters: QueryFilters<T>, input?: GlobalSecondaryIndexQueryInput, requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
if (!has(filters, this.metadata.hash.propertyName)) {
throw new QueryError('Cannot perform a query on a GlobalSecondaryIndex without specifying a hash key value')
} else if (isArray(get(filters, this.metadata.hash.propertyName)) && get(filters, this.metadata.hash.propertyName)[0] !== '=') {
Expand All @@ -189,7 +190,7 @@ export class GlobalSecondaryIndex<T extends Table> {
let output: QueryCommandOutput

try {
output = await this.tableClass.schema.dynamo.query(queryInput)
output = await this.tableClass.schema.dynamo.query(queryInput, requestOptions)
} catch (ex) {
throw new HelpfulError(ex, this.tableClass, queryInput)
}
Expand Down Expand Up @@ -241,12 +242,12 @@ export class GlobalSecondaryIndex<T extends Table> {
* *WARNING*: In most circumstances this is not a good thing to do.
* This will return all the items in this index, does not perform well!
*/
public async scan(filters?: QueryFilters<T> | undefined | null, input: GlobalSecondaryIndexScanInput = {}): Promise<QueryOutput<T>> {
public async scan(filters?: QueryFilters<T> | undefined | null, input: GlobalSecondaryIndexScanInput = {}, requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
const scanInput = this.getScanInput(input, filters == null ? undefined : filters)
const hasProjection = scanInput.ProjectionExpression == null
let output: ScanCommandOutput
try {
output = await this.tableClass.schema.dynamo.scan(scanInput)
output = await this.tableClass.schema.dynamo.scan(scanInput, requestOptions)
} catch (ex) {
throw new HelpfulError(ex, this.tableClass, scanInput)
}
Expand All @@ -264,11 +265,11 @@ export class GlobalSecondaryIndex<T extends Table> {
*
* @see {@link https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan}
*/
public async segmentedScan(filters: QueryFilters<T> | undefined | null, input: GlobalSecondaryIndexSegmentedScanInput): Promise<QueryOutput<T>> {
public async segmentedScan(filters: QueryFilters<T> | undefined | null, input: GlobalSecondaryIndexSegmentedScanInput, requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
const scans: Array<Promise<QueryOutput<T>>> = []
for (let i = 0; i < input.totalSegments; i++) {
input.segment = i
scans.push(this.scan(filters, input))
scans.push(this.scan(filters, input, requestOptions))
}

const scanOutputs = await Promise.all(scans)
Expand Down
9 changes: 5 additions & 4 deletions src/query/local-secondary-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { buildQueryExpression } from './expression'
import { type Filters as QueryFilters } from './filters'
import { QueryOutput } from './output'
import { MagicSearch, type MagicSearchInput } from './search'
import { type IRequestOptions } from '../connections'

interface LocalSecondaryIndexQueryInput {
rangeOrder?: 'ASC' | 'DESC'
Expand Down Expand Up @@ -50,7 +51,7 @@ export class LocalSecondaryIndex<T extends Table> {
return queryInput
}

public async query(filters: QueryFilters<T>, input: LocalSecondaryIndexQueryInput = {}): Promise<QueryOutput<T>> {
public async query(filters: QueryFilters<T>, input: LocalSecondaryIndexQueryInput = {}, requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
if (!has(filters, this.tableClass.schema.primaryKey.hash.propertyName)) {
throw new QueryError('Cannot perform a query on a LocalSecondaryIndex without specifying a hash key value')
}
Expand All @@ -70,7 +71,7 @@ export class LocalSecondaryIndex<T extends Table> {
const hasProjection = queryInput.ProjectionExpression == null
let output: QueryCommandOutput
try {
output = await this.tableClass.schema.dynamo.query(queryInput)
output = await this.tableClass.schema.dynamo.query(queryInput, requestOptions)
} catch (ex) {
throw new HelpfulError(ex, this.tableClass, queryInput)
}
Expand All @@ -91,7 +92,7 @@ export class LocalSecondaryIndex<T extends Table> {
return scanInput
}

public async scan(filters: QueryFilters<T> | undefined | null, input: LocalSecondaryIndexScanInput = {}): Promise<QueryOutput<T>> {
public async scan(filters: QueryFilters<T> | undefined | null, input: LocalSecondaryIndexScanInput = {}, requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
const scanInput = this.getScanInput(input)
if (filters != null && Object.keys(filters).length > 0) {
// don't pass the index metadata, avoids KeyConditionExpression
Expand All @@ -103,7 +104,7 @@ export class LocalSecondaryIndex<T extends Table> {
const hasProjection = scanInput.ProjectionExpression == null
let output: ScanCommandOutput
try {
output = await this.tableClass.schema.dynamo.scan(scanInput)
output = await this.tableClass.schema.dynamo.scan(scanInput, requestOptions)
} catch (ex) {
throw new HelpfulError(ex, this.tableClass, scanInput)
}
Expand Down
26 changes: 14 additions & 12 deletions src/query/primary-key.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { type Filters as QueryFilters, type UpdateConditions } from './filters'
import { QueryOutput } from './output'
import { buildProjectionExpression } from './projection-expression'
import { MagicSearch, type MagicSearchInput } from './search'
import { type IRequestOptions, isRequestOptions } from '../connections'

type PrimaryKeyType = string | number | Date
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
Expand Down Expand Up @@ -127,10 +128,10 @@ export class PrimaryKey<T extends Table, HashKeyType extends PrimaryKeyType, Ran
*
* `.get(instanceOfTable)`
*/
public async get(filters: QueryFilters<T>, input?: PrimaryKeyGetInput): Promise<T | undefined>
public async get(hash: HashKeyType, range: RangeKeyType, input?: PrimaryKeyGetInput): Promise<T | undefined>
public async get(record: T, input?: PrimaryKeyGetInput): Promise<T | undefined>
public async get(hash: HashKeyType | T | QueryFilters<T>, range?: RangeKeyType | PrimaryKeyGetInput, input?: PrimaryKeyGetInput): Promise<T | undefined> {
public async get(filters: QueryFilters<T>, input?: PrimaryKeyGetInput, requestOptions?: IRequestOptions): Promise<T | undefined>
public async get(hash: HashKeyType, range: RangeKeyType, input?: PrimaryKeyGetInput, requestOptions?: IRequestOptions): Promise<T | undefined>
public async get(record: T, input?: PrimaryKeyGetInput, requestOptions?: IRequestOptions): Promise<T | undefined>
public async get(hash: HashKeyType | T | QueryFilters<T>, range?: RangeKeyType | PrimaryKeyGetInput, input?: PrimaryKeyGetInput | IRequestOptions, requestOptions?: IRequestOptions): Promise<T | undefined> {
let record: T

if (isDyngooseTableInstance(hash)) {
Expand All @@ -143,14 +144,15 @@ export class PrimaryKey<T extends Table, HashKeyType extends PrimaryKeyType, Ran
throw new QueryError('PrimaryKey.get called with unknown arguments')
}

const getGetInput: Partial<PrimaryKeyGetGetItemInput> = input == null ? ((range == null || isKeyValue(range)) ? {} : range as PrimaryKeyGetInput) : input
const options = isRequestOptions(requestOptions) ? requestOptions : isRequestOptions(input) ? input : undefined
const getGetInput: Partial<PrimaryKeyGetGetItemInput> = input == null ? ((range == null || isKeyValue(range)) ? {} : range as PrimaryKeyGetInput) : isRequestOptions(input) ? {} : input
getGetInput.key = record.getDynamoKey()
const getItemInput = this.getGetInput(getGetInput as PrimaryKeyGetGetItemInput)
const hasProjection = getItemInput.ProjectionExpression == null
let dynamoRecord: GetItemOutput

try {
dynamoRecord = await this.table.schema.dynamo.getItem(getItemInput)
dynamoRecord = await this.table.schema.dynamo.getItem(getItemInput, options)
} catch (ex) {
throw new HelpfulError(ex, this.table, getItemInput)
}
Expand All @@ -168,14 +170,14 @@ export class PrimaryKey<T extends Table, HashKeyType extends PrimaryKeyType, Ran
*
* @deprecated
*/
public async batchGet(inputs: Array<PrimaryKeyBatchInput<HashKeyType, RangeKeyType>>): Promise<T[]> {
public async batchGet(inputs: Array<PrimaryKeyBatchInput<HashKeyType, RangeKeyType>>, requestOptions?: IRequestOptions): Promise<T[]> {
const batch = new BatchGet<T>()

for (const input of inputs) {
batch.get(this.fromKey(input[0], input[1]))
}

return await batch.retrieve()
return await batch.retrieve(requestOptions)
}

/**
Expand Down Expand Up @@ -234,7 +236,7 @@ export class PrimaryKey<T extends Table, HashKeyType extends PrimaryKeyType, Ran
return queryInput
}

public async query(filters: QueryFilters<T>, input?: PrimaryKeyQueryInput): Promise<QueryOutput<T>> {
public async query(filters: QueryFilters<T>, input?: PrimaryKeyQueryInput, requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
if (!has(filters, this.metadata.hash.propertyName)) {
throw new QueryError('Cannot perform a query on the PrimaryKey index without specifying a hash key value')
} else if (isArray(get(filters, this.metadata.hash.propertyName)) && get(filters, this.metadata.hash.propertyName)[0] !== '=') {
Expand All @@ -251,7 +253,7 @@ export class PrimaryKey<T extends Table, HashKeyType extends PrimaryKeyType, Ran
let output: QueryCommandOutput

try {
output = await this.table.schema.dynamo.query(queryInput)
output = await this.table.schema.dynamo.query(queryInput, requestOptions)
} catch (ex) {
throw new HelpfulError(ex, this.table, queryInput)
}
Expand Down Expand Up @@ -294,11 +296,11 @@ export class PrimaryKey<T extends Table, HashKeyType extends PrimaryKeyType, Ran
return scanInput
}

public async scan(filters?: QueryFilters<T> | undefined | null, input?: PrimaryKeyScanInput): Promise<QueryOutput<T>> {
public async scan(filters?: QueryFilters<T> | undefined | null, input?: PrimaryKeyScanInput, requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
const scanInput = this.getScanInput(input, filters == null ? undefined : filters)
let output: ScanCommandOutput
try {
output = await this.table.schema.dynamo.scan(scanInput)
output = await this.table.schema.dynamo.scan(scanInput, requestOptions)
} catch (ex) {
throw new HelpfulError(ex, this.table, scanInput)
}
Expand Down
19 changes: 10 additions & 9 deletions src/query/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { type LocalSecondaryIndex } from './local-secondary-index'
import { QueryOutput } from './output'
import { type PrimaryKey } from './primary-key'
import { buildProjectionExpression } from './projection-expression'
import { type IRequestOptions } from '../connections'

type Index<T extends Table> = PrimaryKey<T, any, any> | GlobalSecondaryIndex<T> | LocalSecondaryIndex<T> | string

Expand Down Expand Up @@ -231,9 +232,9 @@ export class MagicSearch<T extends Table> {
*
* A promise will be returned that will resolve to the results array upon completion.
*/
async exec(): Promise<QueryOutput<T>> {
async exec(requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
const input = this.getInput()
return await this.page(input)
return await this.page(input, requestOptions)
}

/**
Expand All @@ -249,7 +250,7 @@ export class MagicSearch<T extends Table> {
* It is recommended you apply a `.limit(minOrMore)` before calling `.minimum` to ensure
* you do not load too many results as well.
*/
async minimum(minimum: number): Promise<QueryOutput<T>> {
async minimum(minimum: number, requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
const input = this.getInput()
const outputs: Array<QueryOutput<T>> = []
let page: QueryOutput<T> | undefined
Expand All @@ -260,7 +261,7 @@ export class MagicSearch<T extends Table> {
input.ExclusiveStartKey = page.lastEvaluatedKey
}

page = await this.page(input)
page = await this.page(input, requestOptions)
count += page.count
outputs.push(page)

Expand All @@ -280,7 +281,7 @@ export class MagicSearch<T extends Table> {
* This is also non-ideal for scans, for better performance use a segmented scan
* via the Query.PrimaryKey.segmentedScan or Query.GlobalSecondaryIndex.segmentedScan.
*/
async all(): Promise<QueryOutput<T>> {
async all(requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
const input = this.getInput()
const outputs: Array<QueryOutput<T>> = []
let page: QueryOutput<T> | undefined
Expand All @@ -291,7 +292,7 @@ export class MagicSearch<T extends Table> {
input.ExclusiveStartKey = page.lastEvaluatedKey
}

page = await this.page(input)
page = await this.page(input, requestOptions)
outputs.push(page)
}

Expand Down Expand Up @@ -404,14 +405,14 @@ export class MagicSearch<T extends Table> {
return await this.exec()
}

async page(input: ScanCommandInput | QueryCommandInput): Promise<QueryOutput<T>> {
async page(input: ScanCommandInput | QueryCommandInput, requestOptions?: IRequestOptions): Promise<QueryOutput<T>> {
const hasProjection = input.ProjectionExpression == null
let output: ScanCommandOutput | QueryCommandOutput

// if we are filtering based on key conditions, run a query instead of a scan
if ((input as QueryCommandInput).KeyConditionExpression != null) {
try {
output = await this.tableClass.schema.dynamo.query(input)
output = await this.tableClass.schema.dynamo.query(input, requestOptions)
} catch (ex) {
throw new HelpfulError(ex, this.tableClass, input)
}
Expand All @@ -423,7 +424,7 @@ export class MagicSearch<T extends Table> {
}

try {
output = await this.tableClass.schema.dynamo.scan(input)
output = await this.tableClass.schema.dynamo.scan(input, requestOptions)
} catch (ex) {
throw new HelpfulError(ex, this.tableClass, input)
}
Expand Down
5 changes: 3 additions & 2 deletions src/tables/describe-table.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { type DescribeTableCommandInput, type TableDescription } from '@aws-sdk/client-dynamodb'
import { type Schema } from './schema'
import { type IRequestOptions } from '../connections'

export async function describeTable(schema: Schema): Promise<TableDescription> {
export async function describeTable(schema: Schema, requestOptions?: IRequestOptions): Promise<TableDescription> {
const params: DescribeTableCommandInput = {
TableName: schema.name,
}

const result = await schema.dynamo.describeTable(params)
const result = await schema.dynamo.describeTable(params, requestOptions)
return result.Table as TableDescription
}

0 comments on commit d74b44a

Please sign in to comment.