diff --git a/packages/api-dynamodb-to-elasticsearch/src/execute.ts b/packages/api-dynamodb-to-elasticsearch/src/execute.ts index e98155ac7e4..789ae9e2b14 100644 --- a/packages/api-dynamodb-to-elasticsearch/src/execute.ts +++ b/packages/api-dynamodb-to-elasticsearch/src/execute.ts @@ -59,7 +59,7 @@ const checkErrors = (result?: ApiResponse): void => } continue; } - console.error(item.error); + console.error("Body item with error", item); throw new WebinyError(err, "DYNAMODB_TO_ELASTICSEARCH_ERROR", item); } }; diff --git a/packages/api-elasticsearch-tasks/__tests__/mocks/context.ts b/packages/api-elasticsearch-tasks/__tests__/mocks/context.ts index 3c019b30085..738f421a2d5 100644 --- a/packages/api-elasticsearch-tasks/__tests__/mocks/context.ts +++ b/packages/api-elasticsearch-tasks/__tests__/mocks/context.ts @@ -1,7 +1,7 @@ import { PluginsContainer } from "@webiny/plugins"; import { PartialDeep } from "type-fest"; import { createMockIdentity } from "~tests/mocks/identity"; -import { +import type { Context, ITaskLogUpdateInput, ITaskUpdateData, @@ -10,31 +10,54 @@ import { import { ElasticsearchContext } from "@webiny/api-elasticsearch/types"; // @ts-expect-error import { createMockApiLog } from "@webiny/project-utils/testing/mockApiLog"; +import type { Tenant } from "@webiny/api-tenancy/types"; +import type { Context as LoggerContext } from "@webiny/api-log/types"; export const createContextMock = ( - params?: PartialDeep -): Context & ElasticsearchContext => { + params?: PartialDeep +): Context & ElasticsearchContext & LoggerContext => { + const tenants: Tenant[] = [ + { + id: "root", + name: "Root", + parent: null + } as Tenant + ]; + const locales = [ + { + code: "en-US", + default: true + } + ]; return { logger: createMockApiLog(), tenancy: { listTenants: async () => { - return [ - { - id: "root", - name: "Root", - parent: null - } - ]; + return tenants; + }, + withEachTenant: async (input: Tenant[], cb: (t: Tenant) => Promise) => { + const results = []; + for (const t of input) { + results.push(await cb(t)); + } + return results; } }, i18n: { + locales: { + listLocales: async () => { + return [ + locales, + { + totalCount: locales.length, + hasMoreItems: false, + cursor: null + } + ]; + } + }, getLocales: async () => { - return [ - { - code: "en-US", - default: true - } - ]; + return locales; } }, ...params, @@ -66,5 +89,5 @@ export const createContextMock = ( }, ...params?.tasks } - } as unknown as Context & ElasticsearchContext; + } as unknown as Context & ElasticsearchContext & LoggerContext; }; diff --git a/packages/api-elasticsearch-tasks/__tests__/tasks/createIndexes/createIndexesTaskRunner.test.ts b/packages/api-elasticsearch-tasks/__tests__/tasks/createIndexes/createIndexesTaskRunner.test.ts index d2c1509b306..2f06298e0bb 100644 --- a/packages/api-elasticsearch-tasks/__tests__/tasks/createIndexes/createIndexesTaskRunner.test.ts +++ b/packages/api-elasticsearch-tasks/__tests__/tasks/createIndexes/createIndexesTaskRunner.test.ts @@ -13,7 +13,10 @@ import { ElasticsearchClient } from "@webiny/project-utils/testing/elasticsearch/createClient"; +import { timerFactory } from "@webiny/handler-aws"; + describe("create indexes task runner", () => { + const timer = timerFactory(); let elasticsearchClient: ElasticsearchClient; beforeEach(async () => { @@ -36,7 +39,8 @@ describe("create indexes task runner", () => { }, isAborted: () => { return false; - } + }, + timer }); const indexManager = createIndexManagerMock(); const runner = new CreateIndexesTaskRunner(manager, indexManager); @@ -79,7 +83,8 @@ describe("create indexes task runner", () => { }, isAborted: () => { return false; - } + }, + timer }); const indexManager = createIndexManagerMock({ client: elasticsearchClient diff --git a/packages/api-elasticsearch-tasks/package.json b/packages/api-elasticsearch-tasks/package.json index 7d1ffdac61a..f4d7f87fd2a 100644 --- a/packages/api-elasticsearch-tasks/package.json +++ b/packages/api-elasticsearch-tasks/package.json @@ -15,27 +15,27 @@ "@webiny/api": "0.0.0", "@webiny/api-dynamodb-to-elasticsearch": "0.0.0", "@webiny/api-elasticsearch": "0.0.0", + "@webiny/api-i18n": "0.0.0", "@webiny/api-log": "0.0.0", + "@webiny/api-security": "0.0.0", + "@webiny/api-tenancy": "0.0.0", "@webiny/aws-sdk": "0.0.0", "@webiny/db": "0.0.0", "@webiny/db-dynamodb": "0.0.0", "@webiny/error": "0.0.0", + "@webiny/plugins": "0.0.0", "@webiny/tasks": "0.0.0", "@webiny/utils": "0.0.0" }, "devDependencies": { "@webiny/api": "0.0.0", "@webiny/api-headless-cms": "0.0.0", - "@webiny/api-i18n": "0.0.0", - "@webiny/api-security": "0.0.0", - "@webiny/api-tenancy": "0.0.0", "@webiny/api-wcp": "0.0.0", "@webiny/cli": "0.0.0", "@webiny/handler": "0.0.0", "@webiny/handler-aws": "0.0.0", "@webiny/handler-db": "0.0.0", "@webiny/handler-graphql": "0.0.0", - "@webiny/plugins": "0.0.0", "@webiny/project-utils": "0.0.0", "rimraf": "^6.0.1", "ttypescript": "^1.5.15", diff --git a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/CreateIndexesTaskRunner.ts b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/CreateIndexesTaskRunner.ts index 62b572e0845..2359d9957f1 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/CreateIndexesTaskRunner.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/CreateIndexesTaskRunner.ts @@ -1,12 +1,11 @@ -import { Manager } from "~/tasks/Manager"; -import { IndexManager } from "~/settings"; -import { ITaskResponseResult } from "@webiny/tasks"; -import { - CreateElasticsearchIndexTaskPlugin, - CreateElasticsearchIndexTaskPluginIndex -} from "./CreateElasticsearchIndexTaskPlugin"; -import { Context } from "~/types"; -import { IElasticsearchCreateIndexesTaskInput } from "~/tasks/createIndexes/types"; +import type { Manager } from "~/tasks/Manager"; +import type { IndexManager } from "~/settings"; +import type { ITaskResponseResult } from "@webiny/tasks"; +import type { IElasticsearchCreateIndexesTaskInput } from "./types"; +import { listIndexes } from "./listIndexes"; +import { createIndexFactory } from "./createIndex"; +import type { Context } from "~/types"; +import { listCreateElasticsearchIndexTaskPlugin } from "./listCreateElasticsearchIndexTaskPlugin"; export class CreateIndexesTaskRunner { private readonly manager: Manager; @@ -25,34 +24,18 @@ export class CreateIndexesTaskRunner { matching: string | undefined, done: string[] ): Promise { - const plugins = this.manager.context.plugins.byType< - CreateElasticsearchIndexTaskPlugin - >(CreateElasticsearchIndexTaskPlugin.type); + const plugins = listCreateElasticsearchIndexTaskPlugin( + this.manager.context.plugins + ); if (plugins.length === 0) { return this.manager.response.done("No index plugins found."); } - const indexes: CreateElasticsearchIndexTaskPluginIndex[] = []; - const tenants = await this.manager.context.tenancy.listTenants(); + const indexes = await listIndexes({ + context: this.manager.context, + plugins + }); - for (const tenant of tenants) { - const locales = await this.manager.context.i18n.getLocales(); - for (const locale of locales) { - for (const plugin of plugins) { - const results = await plugin.getIndexList({ - context: this.manager.context, - tenant: tenant.id, - locale: locale.code - }); - for (const result of results) { - if (indexes.some(i => i.index === result.index)) { - continue; - } - indexes.push(result); - } - } - } - } if (indexes.length === 0) { return this.manager.response.done("No indexes found."); } @@ -64,6 +47,8 @@ export class CreateIndexesTaskRunner { return index.includes(matching); }; + const createIndex = createIndexFactory(this.indexManager); + for (const { index, settings } of indexes) { if (this.manager.isAborted()) { return this.manager.response.aborted(); @@ -83,7 +68,7 @@ export class CreateIndexesTaskRunner { continue; } done.push(index); - await this.indexManager.createIndex(index, settings); + await createIndex.create(index, settings); await this.manager.store.addInfoLog({ message: `Index "${index}" created.`, data: { diff --git a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/OnBeforeTrigger.ts b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/OnBeforeTrigger.ts new file mode 100644 index 00000000000..665a3edb377 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/OnBeforeTrigger.ts @@ -0,0 +1,64 @@ +import type { Context } from "~/types"; +import type { IndexManager } from "~/settings"; +import { listIndexes } from "./listIndexes"; +import { createIndexFactory } from "~/tasks/createIndexes/createIndex"; +import { listCreateElasticsearchIndexTaskPlugin } from "~/tasks/createIndexes/listCreateElasticsearchIndexTaskPlugin"; + +export interface IOnBeforeTriggerParams { + indexManager: IndexManager; + context: Context; +} + +export class OnBeforeTrigger { + private readonly context: Context; + private readonly indexManager: IndexManager; + + public constructor(params: IOnBeforeTriggerParams) { + this.context = params.context; + this.indexManager = params.indexManager; + } + + public async run(targets: string[] | undefined): Promise { + const plugins = listCreateElasticsearchIndexTaskPlugin(this.context.plugins); + + try { + const allIndexes = await listIndexes({ + context: this.context, + plugins + }); + const indexes = allIndexes.filter(index => { + if (!targets?.length) { + return true; + } + for (const t of targets) { + if (index.index.includes(t)) { + return true; + } + } + return false; + }); + if (indexes.length === 0) { + console.warn( + "There are no indexes to create before triggering the Create indexes task.", + { + targets + } + ); + return; + } + + const createIndex = createIndexFactory(this.indexManager); + + for (const { index, settings } of indexes) { + try { + console.log("Creating index", index); + await createIndex.createIfNotExists(index, settings); + } catch (ex) { + console.error(`Failed to create index "${index}".`, ex); + } + } + } catch (ex) { + console.error(ex); + } + } +} diff --git a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/createIndex.ts b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/createIndex.ts new file mode 100644 index 00000000000..ad199436db0 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/createIndex.ts @@ -0,0 +1,21 @@ +import { IndexManager } from "~/settings"; + +export const createIndexFactory = (manager: IndexManager) => { + return { + create: async (index: string, settings?: Record): Promise => { + return manager.createIndex(index, settings); + }, + createIfNotExists: async (index: string, settings?: Record): Promise => { + try { + const exists = await manager.indexExists(index); + if (exists) { + return; + } + } catch (ex) { + return; + } + + return await manager.createIndex(index, settings); + } + }; +}; diff --git a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/index.ts b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/index.ts index 46850d171b2..2d5c67d9dd6 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/index.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/index.ts @@ -37,6 +37,23 @@ export const createIndexesTaskDefinition = (params?: IElasticsearchTaskConfig) = const createIndexesTaskRunner = new CreateIndexesTaskRunner(manager, indexManager); return createIndexesTaskRunner.execute(input.matching, Array.from(input.done || [])); + }, + async onBeforeTrigger({ context }) { + // Let's create a new index for the tasks first. + const { IndexManager } = await import( + /* webpackChunkName: "IndexManager" */ "~/settings" + ); + const indexManager = new IndexManager(context.elasticsearch, {}); + const { OnBeforeTrigger } = await import( + /* webpackChunkName: "OnBeforeTrigger" */ + "./OnBeforeTrigger" + ); + + const onBeforeTrigger = new OnBeforeTrigger({ + indexManager, + context + }); + await onBeforeTrigger.run(["webinytask"]); } }); }; diff --git a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/listCreateElasticsearchIndexTaskPlugin.ts b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/listCreateElasticsearchIndexTaskPlugin.ts new file mode 100644 index 00000000000..c8a97e66dc9 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/listCreateElasticsearchIndexTaskPlugin.ts @@ -0,0 +1,11 @@ +import { CreateElasticsearchIndexTaskPlugin } from "~/tasks/createIndexes/CreateElasticsearchIndexTaskPlugin"; +import type { Context } from "~/types"; +import type { PluginsContainer } from "@webiny/plugins"; + +export const listCreateElasticsearchIndexTaskPlugin = ( + plugins: PluginsContainer +): CreateElasticsearchIndexTaskPlugin[] => { + return plugins.byType>( + CreateElasticsearchIndexTaskPlugin.type + ); +}; diff --git a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/listIndexes.ts b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/listIndexes.ts new file mode 100644 index 00000000000..1edcdcac5a6 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/listIndexes.ts @@ -0,0 +1,45 @@ +import type { CreateElasticsearchIndexTaskPluginIndex } from "~/tasks/createIndexes/CreateElasticsearchIndexTaskPlugin"; +import { CreateElasticsearchIndexTaskPlugin } from "~/tasks/createIndexes/CreateElasticsearchIndexTaskPlugin"; +import type { Context } from "~/types"; +import type { Tenant } from "@webiny/api-tenancy/types"; + +export interface IListIndexesParams { + context: Context; + plugins: CreateElasticsearchIndexTaskPlugin[]; +} + +export const listIndexes = async ( + params: IListIndexesParams +): Promise => { + const { context, plugins } = params; + if (plugins.length === 0) { + return []; + } + + const tenants = await context.tenancy.listTenants(); + const results = await context.tenancy.withEachTenant< + Tenant, + CreateElasticsearchIndexTaskPluginIndex[] + >(tenants, async tenant => { + const indexes: CreateElasticsearchIndexTaskPluginIndex[] = []; + const [locales] = await context.i18n.locales.listLocales(); + + for (const locale of locales) { + for (const plugin of plugins) { + const results = await plugin.getIndexList({ + context, + tenant: tenant.id, + locale: locale.code + }); + for (const result of results) { + if (indexes.some(i => i.index === result.index)) { + continue; + } + indexes.push(result); + } + } + } + return indexes; + }); + return results.flat(); +}; diff --git a/packages/api-elasticsearch-tasks/src/types.ts b/packages/api-elasticsearch-tasks/src/types.ts index 0595352a2bd..178e339a996 100644 --- a/packages/api-elasticsearch-tasks/src/types.ts +++ b/packages/api-elasticsearch-tasks/src/types.ts @@ -12,7 +12,7 @@ import { createTable } from "~/definitions"; import type { BatchReadItem, IEntity } from "@webiny/db-dynamodb"; import type { ITimer } from "@webiny/handler-aws"; import type { GenericRecord } from "@webiny/api/types"; -import { Context as LoggerContext } from "@webiny/api-log/types"; +import type { Context as LoggerContext } from "@webiny/api-log/types"; export interface Context extends ElasticsearchContext, TasksContext, LoggerContext {}