Skip to content

Commit

Permalink
fix(api-elasticsearch-tasks): create task index before triggering task (
Browse files Browse the repository at this point in the history
  • Loading branch information
brunozoric authored Feb 19, 2025
1 parent 76c74c3 commit 49a33e7
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 58 deletions.
2 changes: 1 addition & 1 deletion packages/api-dynamodb-to-elasticsearch/src/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const checkErrors = (result?: ApiResponse<BulkOperationsResponseBody>): void =>
}
continue;
}
console.error(item.error);
console.error("Body item with error", item);
throw new WebinyError(err, "DYNAMODB_TO_ELASTICSEARCH_ERROR", item);
}
};
Expand Down
57 changes: 40 additions & 17 deletions packages/api-elasticsearch-tasks/__tests__/mocks/context.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { PluginsContainer } from "@webiny/plugins";
import { PartialDeep } from "type-fest";
import { createMockIdentity } from "~tests/mocks/identity";
import {
import type {
Context,
ITaskLogUpdateInput,
ITaskUpdateData,
Expand All @@ -10,31 +10,54 @@ import {
import { ElasticsearchContext } from "@webiny/api-elasticsearch/types";
// @ts-expect-error
import { createMockApiLog } from "@webiny/project-utils/testing/mockApiLog";
import type { Tenant } from "@webiny/api-tenancy/types";
import type { Context as LoggerContext } from "@webiny/api-log/types";

export const createContextMock = (
params?: PartialDeep<Context & ElasticsearchContext>
): Context & ElasticsearchContext => {
params?: PartialDeep<Context & ElasticsearchContext & LoggerContext>
): Context & ElasticsearchContext & LoggerContext => {
const tenants: Tenant[] = [
{
id: "root",
name: "Root",
parent: null
} as Tenant
];
const locales = [
{
code: "en-US",
default: true
}
];
return {
logger: createMockApiLog(),
tenancy: {
listTenants: async () => {
return [
{
id: "root",
name: "Root",
parent: null
}
];
return tenants;
},
withEachTenant: async (input: Tenant[], cb: (t: Tenant) => Promise<any>) => {
const results = [];
for (const t of input) {
results.push(await cb(t));
}
return results;
}
},
i18n: {
locales: {
listLocales: async () => {
return [
locales,
{
totalCount: locales.length,
hasMoreItems: false,
cursor: null
}
];
}
},
getLocales: async () => {
return [
{
code: "en-US",
default: true
}
];
return locales;
}
},
...params,
Expand Down Expand Up @@ -66,5 +89,5 @@ export const createContextMock = (
},
...params?.tasks
}
} as unknown as Context & ElasticsearchContext;
} as unknown as Context & ElasticsearchContext & LoggerContext;
};
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import {
ElasticsearchClient
} from "@webiny/project-utils/testing/elasticsearch/createClient";

import { timerFactory } from "@webiny/handler-aws";

describe("create indexes task runner", () => {
const timer = timerFactory();
let elasticsearchClient: ElasticsearchClient;

beforeEach(async () => {
Expand All @@ -36,7 +39,8 @@ describe("create indexes task runner", () => {
},
isAborted: () => {
return false;
}
},
timer
});
const indexManager = createIndexManagerMock();
const runner = new CreateIndexesTaskRunner(manager, indexManager);
Expand Down Expand Up @@ -79,7 +83,8 @@ describe("create indexes task runner", () => {
},
isAborted: () => {
return false;
}
},
timer
});
const indexManager = createIndexManagerMock({
client: elasticsearchClient
Expand Down
8 changes: 4 additions & 4 deletions packages/api-elasticsearch-tasks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@
"@webiny/api": "0.0.0",
"@webiny/api-dynamodb-to-elasticsearch": "0.0.0",
"@webiny/api-elasticsearch": "0.0.0",
"@webiny/api-i18n": "0.0.0",
"@webiny/api-log": "0.0.0",
"@webiny/api-security": "0.0.0",
"@webiny/api-tenancy": "0.0.0",
"@webiny/aws-sdk": "0.0.0",
"@webiny/db": "0.0.0",
"@webiny/db-dynamodb": "0.0.0",
"@webiny/error": "0.0.0",
"@webiny/plugins": "0.0.0",
"@webiny/tasks": "0.0.0",
"@webiny/utils": "0.0.0"
},
"devDependencies": {
"@webiny/api": "0.0.0",
"@webiny/api-headless-cms": "0.0.0",
"@webiny/api-i18n": "0.0.0",
"@webiny/api-security": "0.0.0",
"@webiny/api-tenancy": "0.0.0",
"@webiny/api-wcp": "0.0.0",
"@webiny/cli": "0.0.0",
"@webiny/handler": "0.0.0",
"@webiny/handler-aws": "0.0.0",
"@webiny/handler-db": "0.0.0",
"@webiny/handler-graphql": "0.0.0",
"@webiny/plugins": "0.0.0",
"@webiny/project-utils": "0.0.0",
"rimraf": "^6.0.1",
"ttypescript": "^1.5.15",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { Manager } from "~/tasks/Manager";
import { IndexManager } from "~/settings";
import { ITaskResponseResult } from "@webiny/tasks";
import {
CreateElasticsearchIndexTaskPlugin,
CreateElasticsearchIndexTaskPluginIndex
} from "./CreateElasticsearchIndexTaskPlugin";
import { Context } from "~/types";
import { IElasticsearchCreateIndexesTaskInput } from "~/tasks/createIndexes/types";
import type { Manager } from "~/tasks/Manager";
import type { IndexManager } from "~/settings";
import type { ITaskResponseResult } from "@webiny/tasks";
import type { IElasticsearchCreateIndexesTaskInput } from "./types";
import { listIndexes } from "./listIndexes";
import { createIndexFactory } from "./createIndex";
import type { Context } from "~/types";
import { listCreateElasticsearchIndexTaskPlugin } from "./listCreateElasticsearchIndexTaskPlugin";

export class CreateIndexesTaskRunner {
private readonly manager: Manager<IElasticsearchCreateIndexesTaskInput>;
Expand All @@ -25,34 +24,18 @@ export class CreateIndexesTaskRunner {
matching: string | undefined,
done: string[]
): Promise<ITaskResponseResult> {
const plugins = this.manager.context.plugins.byType<
CreateElasticsearchIndexTaskPlugin<Context>
>(CreateElasticsearchIndexTaskPlugin.type);
const plugins = listCreateElasticsearchIndexTaskPlugin<Context>(
this.manager.context.plugins
);
if (plugins.length === 0) {
return this.manager.response.done("No index plugins found.");
}
const indexes: CreateElasticsearchIndexTaskPluginIndex[] = [];

const tenants = await this.manager.context.tenancy.listTenants();
const indexes = await listIndexes({
context: this.manager.context,
plugins
});

for (const tenant of tenants) {
const locales = await this.manager.context.i18n.getLocales();
for (const locale of locales) {
for (const plugin of plugins) {
const results = await plugin.getIndexList({
context: this.manager.context,
tenant: tenant.id,
locale: locale.code
});
for (const result of results) {
if (indexes.some(i => i.index === result.index)) {
continue;
}
indexes.push(result);
}
}
}
}
if (indexes.length === 0) {
return this.manager.response.done("No indexes found.");
}
Expand All @@ -64,6 +47,8 @@ export class CreateIndexesTaskRunner {
return index.includes(matching);
};

const createIndex = createIndexFactory(this.indexManager);

for (const { index, settings } of indexes) {
if (this.manager.isAborted()) {
return this.manager.response.aborted();
Expand All @@ -83,7 +68,7 @@ export class CreateIndexesTaskRunner {
continue;
}
done.push(index);
await this.indexManager.createIndex(index, settings);
await createIndex.create(index, settings);
await this.manager.store.addInfoLog({
message: `Index "${index}" created.`,
data: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import type { Context } from "~/types";
import type { IndexManager } from "~/settings";
import { listIndexes } from "./listIndexes";
import { createIndexFactory } from "~/tasks/createIndexes/createIndex";
import { listCreateElasticsearchIndexTaskPlugin } from "~/tasks/createIndexes/listCreateElasticsearchIndexTaskPlugin";

export interface IOnBeforeTriggerParams {
indexManager: IndexManager;
context: Context;
}

export class OnBeforeTrigger {
private readonly context: Context;
private readonly indexManager: IndexManager;

public constructor(params: IOnBeforeTriggerParams) {
this.context = params.context;
this.indexManager = params.indexManager;
}

public async run(targets: string[] | undefined): Promise<void> {
const plugins = listCreateElasticsearchIndexTaskPlugin<Context>(this.context.plugins);

try {
const allIndexes = await listIndexes({
context: this.context,
plugins
});
const indexes = allIndexes.filter(index => {
if (!targets?.length) {
return true;
}
for (const t of targets) {
if (index.index.includes(t)) {
return true;
}
}
return false;
});
if (indexes.length === 0) {
console.warn(
"There are no indexes to create before triggering the Create indexes task.",
{
targets
}
);
return;
}

const createIndex = createIndexFactory(this.indexManager);

for (const { index, settings } of indexes) {
try {
console.log("Creating index", index);
await createIndex.createIfNotExists(index, settings);
} catch (ex) {
console.error(`Failed to create index "${index}".`, ex);
}
}
} catch (ex) {
console.error(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { IndexManager } from "~/settings";

export const createIndexFactory = (manager: IndexManager) => {
return {
create: async (index: string, settings?: Record<string, any>): Promise<void> => {
return manager.createIndex(index, settings);
},
createIfNotExists: async (index: string, settings?: Record<string, any>): Promise<void> => {
try {
const exists = await manager.indexExists(index);
if (exists) {
return;
}
} catch (ex) {
return;
}

return await manager.createIndex(index, settings);
}
};
};
17 changes: 17 additions & 0 deletions packages/api-elasticsearch-tasks/src/tasks/createIndexes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ export const createIndexesTaskDefinition = (params?: IElasticsearchTaskConfig) =
const createIndexesTaskRunner = new CreateIndexesTaskRunner(manager, indexManager);

return createIndexesTaskRunner.execute(input.matching, Array.from(input.done || []));
},
async onBeforeTrigger({ context }) {
// Let's create a new index for the tasks first.
const { IndexManager } = await import(
/* webpackChunkName: "IndexManager" */ "~/settings"
);
const indexManager = new IndexManager(context.elasticsearch, {});
const { OnBeforeTrigger } = await import(
/* webpackChunkName: "OnBeforeTrigger" */
"./OnBeforeTrigger"
);

const onBeforeTrigger = new OnBeforeTrigger({
indexManager,
context
});
await onBeforeTrigger.run(["webinytask"]);
}
});
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { CreateElasticsearchIndexTaskPlugin } from "~/tasks/createIndexes/CreateElasticsearchIndexTaskPlugin";
import type { Context } from "~/types";
import type { PluginsContainer } from "@webiny/plugins";

export const listCreateElasticsearchIndexTaskPlugin = <C extends Context = Context>(
plugins: PluginsContainer
): CreateElasticsearchIndexTaskPlugin<C>[] => {
return plugins.byType<CreateElasticsearchIndexTaskPlugin<Context>>(
CreateElasticsearchIndexTaskPlugin.type
);
};
Loading

0 comments on commit 49a33e7

Please sign in to comment.