Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db-dynamodb): tools for batch write #4445

Draft
wants to merge 5 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/api-elasticsearch-tasks/src/definitions/entry.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* TODO If adding GSIs to the Elasticsearch table, add them here.
*/
import { Entity, TableDef } from "@webiny/db-dynamodb/toolbox";

interface Params {
Expand All @@ -24,6 +27,9 @@ export const createEntry = (params: Params): Entity<any> => {
},
data: {
type: "map"
},
TYPE: {
type: "string"
}
}
});
Expand Down
17 changes: 2 additions & 15 deletions packages/api-elasticsearch-tasks/src/tasks/Manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@ import { createEntry } from "~/definitions/entry";
import { Entity } from "@webiny/db-dynamodb/toolbox";
import { ITaskResponse } from "@webiny/tasks/response/abstractions";
import { IIsCloseToTimeoutCallable, ITaskManagerStore } from "@webiny/tasks/runner/abstractions";
import {
batchReadAll,
BatchReadItem,
batchWriteAll,
BatchWriteItem,
BatchWriteResult
} from "@webiny/db-dynamodb";
import { batchReadAll, BatchReadItem } from "@webiny/db-dynamodb";
import { ITimer } from "@webiny/handler-aws/utils";

export interface ManagerParams<T> {
Expand Down Expand Up @@ -75,17 +69,10 @@ export class Manager<T> implements IManager<T> {
}));
}

public async read<T>(items: BatchReadItem[]) {
public async read<T>(items: BatchReadItem[]): Promise<T[]> {
return await batchReadAll<T>({
table: this.table,
items
});
}

public async write(items: BatchWriteItem[]): Promise<BatchWriteResult> {
return await batchWriteAll({
table: this.table,
items
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
} from "~/types";
import { ITaskResponse, ITaskResponseResult } from "@webiny/tasks/response/abstractions";
import { scan } from "~/helpers/scan";
import { BatchWriteItem, ScanResponse } from "@webiny/db-dynamodb";
import { createTableWriteBatch, ScanResponse } from "@webiny/db-dynamodb";
import { IndexManager } from "~/settings";
import { IIndexManager } from "~/settings/types";

Expand Down Expand Up @@ -73,7 +73,10 @@ export class ReindexingTaskRunner {
return this.response.done("No more items to process.");
}

const batch: BatchWriteItem[] = [];
const tableWriteBatch = createTableWriteBatch({
table: this.manager.table
});

for (const item of results.items) {
/**
* No index defined? Impossible but let's skip if really happens.
Expand Down Expand Up @@ -110,14 +113,13 @@ export class ReindexingTaskRunner {
/**
* Reindexing will be triggered by the `putBatch` method.
*/
batch.push(
entity.putBatch({
...item,
modified: new Date().toISOString()
})
);
tableWriteBatch.put(entity, {
...item,
TYPE: item.TYPE || "unknown",
modified: new Date().toISOString()
});
}
await this.manager.write(batch);
await tableWriteBatch.execute();
/**
* We always store the index settings, so we can restore them later.
* Also, we always want to store what was the last key we processed, just in case something breaks, so we can continue from this point.
Expand Down
26 changes: 14 additions & 12 deletions packages/api-elasticsearch-tasks/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { ElasticsearchContext } from "@webiny/api-elasticsearch/types";
import { Entity } from "@webiny/db-dynamodb/toolbox";
import {
import type { ElasticsearchContext } from "@webiny/api-elasticsearch/types";
import type { Entity } from "@webiny/db-dynamodb/toolbox";
import type {
Context as TasksContext,
IIsCloseToTimeoutCallable,
ITaskManagerStore,
ITaskResponse,
ITaskResponseDoneResultOutput
} from "@webiny/tasks/types";
import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb";
import { Client } from "@webiny/api-elasticsearch";
import type { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb";
import type { Client } from "@webiny/api-elasticsearch";
import { createTable } from "~/definitions";
import { ITaskResponse } from "@webiny/tasks/response/abstractions";
import { ITaskManagerStore } from "@webiny/tasks/runner/abstractions";
import { BatchWriteItem, BatchWriteResult } from "@webiny/db-dynamodb";
import { ITimer } from "@webiny/handler-aws";
import type { BatchReadItem } from "@webiny/db-dynamodb";
import type { ITimer } from "@webiny/handler-aws";
import type { GenericRecord } from "@webiny/api/types";

export interface Context extends ElasticsearchContext, TasksContext {}

Expand Down Expand Up @@ -42,17 +43,18 @@ export interface IElasticsearchIndexingTaskValues {
}

export interface AugmentedError extends Error {
data?: Record<string, any>;
data?: GenericRecord;
[key: string]: any;
}

export interface IDynamoDbElasticsearchRecord {
PK: string;
SK: string;
TYPE?: string;
index: string;
_et?: string;
entity: string;
data: Record<string, any>;
data: GenericRecord;
modified: string;
}

Expand All @@ -72,5 +74,5 @@ export interface IManager<

getEntity: (name: string) => Entity<any>;

write: (items: BatchWriteItem[]) => Promise<BatchWriteResult>;
read<T>(items: BatchReadItem[]): Promise<T[]>;
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb";
import { Entity, Table } from "@webiny/db-dynamodb/toolbox";
import {
FileManagerAliasesStorageOperations,
import type { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb";
import type { Entity, Table } from "@webiny/db-dynamodb/toolbox";
import type {
File,
FileAlias
FileAlias,
FileManagerAliasesStorageOperations
} from "@webiny/api-file-manager/types";
import {
BatchWriteItem,
batchWriteAll,
createEntityWriteBatch,
createStandardEntity,
createTable,
DbItem,
Expand Down Expand Up @@ -39,52 +38,49 @@ export class AliasesStorageOperations implements FileManagerAliasesStorageOperat

async deleteAliases(file: File): Promise<void> {
const aliasItems = await this.getExistingAliases(file);
const items: BatchWriteItem[] = [];

aliasItems.forEach(item => {
items.push(
this.aliasEntity.deleteBatch({
const batchWrite = createEntityWriteBatch({
entity: this.aliasEntity,
delete: aliasItems.map(item => {
return {
PK: this.createPartitionKey({
id: item.fileId,
tenant: item.tenant,
locale: item.locale
}),
SK: `ALIAS#${item.alias}`
})
);
};
})
});

await batchWriteAll({ table: this.table, items });
await batchWrite.execute();
}

async storeAliases(file: File): Promise<void> {
const items: BatchWriteItem[] = [];
const existingAliases = await this.getExistingAliases(file);
const newAliases = this.createNewAliasesRecords(file, existingAliases);

newAliases.forEach(alias => {
items.push(this.aliasEntity.putBatch(alias));
const batchWrite = createEntityWriteBatch({
entity: this.aliasEntity
});
for (const alias of newAliases) {
batchWrite.put(alias);
}

// Delete aliases that are in the DB but are NOT in the file.
for (const data of existingAliases) {
if (!file.aliases.some(alias => data.alias === alias)) {
items.push(
this.aliasEntity.deleteBatch({
PK: this.createPartitionKey(file),
SK: `ALIAS#${data.alias}`
})
);
batchWrite.delete({
PK: this.createPartitionKey(file),
SK: `ALIAS#${data.alias}`
});
}
}

await batchWriteAll({
table: this.table,
items
});
await batchWrite.execute();
}

private async getExistingAliases(file: File) {
private async getExistingAliases(file: File): Promise<FileAlias[]> {
const aliases = await queryAll<{ data: FileAlias }>({
entity: this.aliasEntity,
partitionKey: this.createPartitionKey(file),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ export const createElasticsearchEntity = (params: Params) => {
TYPE: {
type: "string"
},

...(attributes || {})
}
});
Expand Down
Loading
Loading