diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index decbd8a0..23589d3d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: - name: Run Tests run: | - npm run test + npm run test:ci automerge: needs: build diff --git a/.github/workflows/release.background-jobs-common.yml b/.github/workflows/release.background-jobs-common.yml new file mode 100644 index 00000000..0daefb6f --- /dev/null +++ b/.github/workflows/release.background-jobs-common.yml @@ -0,0 +1,18 @@ +name: Package release to NPM -> background-jobs-common +on: + pull_request: + types: + - closed + branches: + - main + paths: + - 'packages/app/background-jobs-common/**' + +jobs: + call-build-flow: + uses: lokalise/shared-ts-libs/.github/workflows/release.package.yml@main + with: + working_directory: 'packages/app/background-jobs-common' + package_name: 'background-jobs-common' + secrets: + npm_token: ${{ secrets.NPM_TOKEN }} diff --git a/.github/workflows/release.package.yml b/.github/workflows/release.package.yml index 1e747e1e..e5c885fe 100644 --- a/.github/workflows/release.package.yml +++ b/.github/workflows/release.package.yml @@ -36,7 +36,6 @@ jobs: - name: Build Package run: npm run build - working-directory: ${{ inputs.working_directory }} - name: Setup git config run: | diff --git a/package.json b/package.json index a514cee9..58336827 100644 --- a/package.json +++ b/package.json @@ -6,8 +6,8 @@ "packages/app/*" ], "scripts": { - "build": "npm run build --workspaces --if-present", - "test": "npm run test --workspaces --if-present", + "build": "npm run build -w packages/app/id-utils && npm run build --workspaces --if-present", + "test:ci": "npm run test:ci --workspaces --if-present", "lint": "npm run lint --workspaces --if-present", "lint:fix": "npm run lint:fix --workspaces --if-present" }, diff --git a/packages/app/api-common/package.json b/packages/app/api-common/package.json index 519034a1..84be6091 100644 --- a/packages/app/api-common/package.json +++ b/packages/app/api-common/package.json @@ -27,7 +27,7 @@ "clean": "rimraf dist .eslintcache", "lint": "eslint --cache --max-warnings=0 . && prettier --check --log-level warn src test \"**/*.{json,md}\" && tsc --noEmit", "lint:fix": "eslint . --fix && prettier --write src test \"**/*.{json,md}\"", - "test": "vitest run --coverage", + "test:ci": "vitest run --coverage", "prepublishOnly": "npm run build", "package-version": "echo $npm_package_version" }, diff --git a/packages/app/background-jobs-common/.env.test b/packages/app/background-jobs-common/.env.test new file mode 100644 index 00000000..f57dbea7 --- /dev/null +++ b/packages/app/background-jobs-common/.env.test @@ -0,0 +1,7 @@ +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_USERNAME= +REDIS_PASSWORD=sOmE_sEcUrE_pAsS +REDIS_USE_TLS=false +REDIS_CONNECT_TIMEOUT= +REDIS_COMMAND_TIMEOUT= diff --git a/packages/app/background-jobs-common/.eslintignore b/packages/app/background-jobs-common/.eslintignore new file mode 100644 index 00000000..b9470778 --- /dev/null +++ b/packages/app/background-jobs-common/.eslintignore @@ -0,0 +1,2 @@ +node_modules/ +dist/ diff --git a/packages/app/background-jobs-common/.eslintrc.json b/packages/app/background-jobs-common/.eslintrc.json new file mode 100644 index 00000000..ff8536ae --- /dev/null +++ b/packages/app/background-jobs-common/.eslintrc.json @@ -0,0 +1,6 @@ +{ + "extends": ["@lokalise/eslint-config/shared-package"], + "parserOptions": { + "project": "./tsconfig.lint.json" + } +} diff --git a/packages/app/background-jobs-common/.gitignore b/packages/app/background-jobs-common/.gitignore new file mode 100644 index 00000000..03f97ad9 --- /dev/null +++ b/packages/app/background-jobs-common/.gitignore @@ -0,0 +1,4 @@ +node_modules +dist +coverage +.eslintcache diff --git a/packages/app/background-jobs-common/LICENSE b/packages/app/background-jobs-common/LICENSE new file mode 100644 index 00000000..08a6e77f --- /dev/null +++ b/packages/app/background-jobs-common/LICENSE @@ -0,0 +1,190 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2024 Lokalise + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/packages/app/background-jobs-common/README.md b/packages/app/background-jobs-common/README.md new file mode 100644 index 00000000..63312cb0 --- /dev/null +++ b/packages/app/background-jobs-common/README.md @@ -0,0 +1,79 @@ +# Common background jobs library + +This library provides a basic abstraction over BullMQ-powered background jobs. There are two types available: + +- AbstractBackgroundJobProcessor: a base class for running jobs, it provides a NewRelic and logger integration plus + basic API for enqueuing jobs. +- AbstractStepBasedJobProcessor: a base class for step-based jobs. Logic has to be defined in classes that implement + a `JobStep` interface and have a job data which extends `StepBasedJobData` type + +## Getting Started + +Install all dependencies: + +```shell +npm install +``` + +Run all tests: + +```shell +npm run test +``` + +## Usage + +See test implementations in `./test/processors` folder. Extend either `AbstractBackgroundJobProcessor` or +`AbstractStepBasedJobProcessor` and implement required methods. + +### Common jobs + +For that type of jobs, you will need to extend `AbstractBackgroundJobProcessor` and implement a `processInternal` method. +It will be called when a job is dequeued. Processing logic is automatically wrapped into NewRelic and basic logger calls, +so you only need to add your domain logic. + +Both queue and worker is automatically started when you instantiate the processor. There is a default configuration which +you can override by passing `queueConfig.queueOptions` and `workerOptions` params to the constructor. + +Use `dispose()` to correctly stop processing any new messages and wait for the current ones to finish. + +### Step-based jobs + +To create a step-based job, extend the `AbstractStepBasedJobProcessor`. This is a more complex type of job processor (based on the previous one) - it can only run via specific classes which +implement the actual logic, and it has some restrictions on the job data generic type. + +You will need to implement the following methods: + +#### `getStepTransitions(): Record | null>` + +Define a map of your job steps here: keys correspond to the current job state (`JobData.execution.state`) and values +are instances of a `JobStep` interface or `null` values (they finish the job execution). Each step has to implement a +`run` method which returns a new `execution` object - it will replace the existing one after +the step is finished. + +Example implementation: + +```typescript +protected getStepTransitions(): Record | null> { + return { + initial: new StepFirst(), + 'other-state': new StepSecond(), + completed: null, + } +} +``` + +In the example above (depending on the `getDefaultExecutionState` implementation), the job will start in the `initial` +state and proceed as follows: + +1. `StepFirst` will be executed +2. (if `execution.state` has changed to `other-state`) `StepSecond` will be executed +3. (if `execution.state` has changed to `completed`) the job will be finished + +#### `getDefaultExecutionState(): JobData['execution']` + +Define the default job execution state here. It will be used when a job is scheduled. + +#### `onError(error: Error | unknown, job: Job): Promise` + +Define the error handler here. It will be called when any of the steps throws an exception. diff --git a/packages/app/background-jobs-common/docker-compose.yml b/packages/app/background-jobs-common/docker-compose.yml new file mode 100644 index 00000000..dbb49a62 --- /dev/null +++ b/packages/app/background-jobs-common/docker-compose.yml @@ -0,0 +1,7 @@ +services: + redis: + image: redis:6.2.7-alpine + command: redis-server --requirepass sOmE_sEcUrE_pAsS + ports: + - ${DOCKER_REDIS_PORT:-6379}:6379 + restart: on-failure diff --git a/packages/app/background-jobs-common/package.json b/packages/app/background-jobs-common/package.json new file mode 100644 index 00000000..8d1e89bf --- /dev/null +++ b/packages/app/background-jobs-common/package.json @@ -0,0 +1,60 @@ +{ + "name": "@lokalise/background-jobs-common", + "version": "1.0.0", + "files": [ + "dist", + "LICENSE", + "README.md" + ], + "author": { + "name": "Lokalise", + "url": "https://lokalise.com/" + }, + "homepage": "https://github.com/lokalise/shared-ts-libs", + "repository": { + "type": "git", + "url": "git://github.com/lokalise/shared-ts-libs.git" + }, + "private": false, + "license": "Apache-2.0", + "type": "commonjs", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "scripts": { + "build": "rimraf dist && tsc", + "clean": "rimraf dist .eslintcache", + "lint": "eslint --cache --max-warnings=0 . && prettier --check --log-level warn src \"**/*.{json,md,ts,tsx}\" && tsc --noEmit", + "lint:fix": "prettier --write src \"**/*.{json,md,ts,tsx}\" --log-level=warn && eslint . --fix", + "docker:start:ci": "docker compose up -d --quiet-pull redis", + "test": "vitest run", + "pretest:ci": "npm run docker:start:ci", + "test:ci": "npm run test -- --coverage", + "test:ci:teardown": "docker compose down", + "prepublishOnly": "npm run build", + "package-version": "echo $npm_package_version" + }, + "dependencies": { + "@lokalise/id-utils": "1.0.0", + "@lokalise/node-core": "^9.12.0", + "pino": "^8.19.0", + "ts-deepmerge": "^7.0.0" + }, + "peerDependencies": { + "bullmq": "^5.4.2" + }, + "devDependencies": { + "@types/node": "^20.12.7", + "@lokalise/eslint-config": "latest", + "@lokalise/fastify-extras": "^17.0.3", + "@lokalise/prettier-config": "latest", + "@lokalise/package-vite-config": "latest", + "@vitest/coverage-v8": "^1.4.0", + "bullmq": "^5.4.2", + "ioredis": "^5.3.2", + "prettier": "3.2.5", + "rimraf": "^5.0.5", + "typescript": "5.4.5", + "vitest": "^1.4.0" + }, + "prettier": "@lokalise/prettier-config" +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/BackgroundJobProcessorLogger.spec.ts b/packages/app/background-jobs-common/src/background-job-processor/BackgroundJobProcessorLogger.spec.ts new file mode 100644 index 00000000..65314ffa --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/BackgroundJobProcessorLogger.spec.ts @@ -0,0 +1,189 @@ +import { globalLogger } from '@lokalise/node-core' +import { Logger } from 'pino' +import { beforeEach, describe, expect, it, MockInstance, vitest } from 'vitest' + +import { BackgroundJobProcessorLogger } from './BackgroundJobProcessorLogger' + +const logger = globalLogger + +describe('BackgroundJobProcessorLogger', () => { + let testLogger: Logger + let backgroundJobProcessorLogger: BackgroundJobProcessorLogger + let jobLogSpy: MockInstance + + beforeEach(() => { + testLogger = logger.child({ test: true }) + const fakeJob = { log: async () => Promise.resolve() } + jobLogSpy = vitest.spyOn(fakeJob, 'log') + backgroundJobProcessorLogger = new BackgroundJobProcessorLogger(testLogger, fakeJob as any) + }) + + describe('level', () => { + it('changes the level of the logger', () => { + backgroundJobProcessorLogger.level = 'debug' + expect(backgroundJobProcessorLogger.level).toBe('debug') + + backgroundJobProcessorLogger.level = 'silent' + expect(backgroundJobProcessorLogger.level).toBe('silent') + + backgroundJobProcessorLogger.level = 'fatal' + expect(backgroundJobProcessorLogger.level).toBe('fatal') + }) + }) + + describe('silent', () => { + let silentSpy: MockInstance + + beforeEach(() => { + testLogger.level = 'silent' + silentSpy = vitest.spyOn(testLogger, 'silent') + }) + + it.each(['test', { msg: 'test', prop: 'prop' }])('log', (testCase) => { + backgroundJobProcessorLogger.silent(testCase) + expect(silentSpy).toHaveBeenCalledWith(testCase, undefined, []) + expect(jobLogSpy).not.toHaveBeenCalled() + }) + }) + + describe('trace', () => { + let traceSpy: MockInstance + + beforeEach(() => { + testLogger.level = 'trace' + traceSpy = vitest.spyOn(testLogger, 'trace') + }) + + it.each(['test', { msg: 'test', prop: 'prop' }, { message: 'test' }])('log', (testCase) => { + backgroundJobProcessorLogger.trace(testCase) + expect(traceSpy).toHaveBeenCalledWith(testCase, undefined, []) + expect(jobLogSpy).toHaveBeenCalledWith('[trace] test') + }) + + it('does not log with upper log level', () => { + testLogger.level = 'debug' + + backgroundJobProcessorLogger.trace({ msg: 'test' }) + expect(jobLogSpy).not.toHaveBeenCalled() + }) + }) + + describe('debug', () => { + let debugSpy: MockInstance + + beforeEach(() => { + testLogger.level = 'debug' + debugSpy = vitest.spyOn(testLogger, 'debug') + }) + + it.each(['test', { msg: 'test', prop: 'prop' }, { message: 'test' }])('log', (testCase) => { + backgroundJobProcessorLogger.debug(testCase) + expect(debugSpy).toHaveBeenCalledWith(testCase, undefined, []) + expect(jobLogSpy).toHaveBeenCalledWith('[debug] test') + }) + + it('does not log with upper log level', () => { + testLogger.level = 'info' + + backgroundJobProcessorLogger.debug({ msg: 'test' }) + expect(jobLogSpy).not.toHaveBeenCalled() + }) + }) + + describe('info', () => { + let infoSpy: MockInstance + + beforeEach(() => { + testLogger.level = 'info' + infoSpy = vitest.spyOn(testLogger, 'info') + }) + + it.each(['test', { msg: 'test', prop: 'prop' }, { message: 'test' }])('log', (testCase) => { + backgroundJobProcessorLogger.info(testCase) + expect(infoSpy).toHaveBeenCalledWith(testCase, undefined, []) + expect(jobLogSpy).toHaveBeenCalledWith('[info] test') + }) + + it('does not log with upper log level', () => { + testLogger.level = 'warn' + + backgroundJobProcessorLogger.info({ msg: 'test' }) + expect(jobLogSpy).not.toHaveBeenCalled() + }) + }) + + describe('warn', () => { + let warnSpy: MockInstance + + beforeEach(() => { + testLogger.level = 'warn' + warnSpy = vitest.spyOn(testLogger, 'warn') + }) + + it.each(['test', { msg: 'test', prop: 'prop' }, { message: 'test' }])('log', (testCase) => { + backgroundJobProcessorLogger.warn(testCase) + expect(warnSpy).toHaveBeenCalledWith(testCase, undefined, []) + expect(jobLogSpy).toHaveBeenCalledWith('[warn] test') + }) + + it('does not log with upper log level', () => { + backgroundJobProcessorLogger.level = 'error' + + backgroundJobProcessorLogger.warn({ msg: 'test' }) + expect(jobLogSpy).not.toHaveBeenCalled() + }) + }) + + describe('error', () => { + let errorSpy: MockInstance + + beforeEach(() => { + testLogger.level = 'error' + errorSpy = vitest.spyOn(testLogger, 'error') + }) + + it.each(['test', { msg: 'test', prop: 'prop' }, { message: 'test' }])('log', (testCase) => { + backgroundJobProcessorLogger.error(testCase) + expect(errorSpy).toHaveBeenCalledWith(testCase, undefined, []) + expect(jobLogSpy).toHaveBeenCalledWith('[error] test') + }) + + it('does not log with upper log level', () => { + backgroundJobProcessorLogger.level = 'fatal' + + backgroundJobProcessorLogger.error({ msg: 'test' }) + expect(jobLogSpy).not.toHaveBeenCalled() + }) + }) + + describe('fatal', () => { + let fatalSpy: MockInstance + + beforeEach(() => { + testLogger.level = 'fatal' + fatalSpy = vitest.spyOn(testLogger, 'fatal') + }) + + it.each(['test', { msg: 'test', prop: 'prop' }, { message: 'test' }])('log', (testCase) => { + backgroundJobProcessorLogger.fatal(testCase) + expect(fatalSpy).toHaveBeenCalledWith(testCase, undefined, []) + expect(jobLogSpy).toHaveBeenCalledWith('[fatal] test') + }) + }) + + describe('child', () => { + let childSpy: MockInstance + + beforeEach(() => { + testLogger.level = 'fatal' + childSpy = vitest.spyOn(testLogger, 'child') + }) + + it('creates a child logger', () => { + const result = backgroundJobProcessorLogger.child({ child: true }) + expect(result).toBeInstanceOf(BackgroundJobProcessorLogger) + expect(childSpy).toHaveBeenCalledOnce() + expect(childSpy).toHaveBeenCalledWith({ child: true }, undefined) + }) + }) +}) diff --git a/packages/app/background-jobs-common/src/background-job-processor/BackgroundJobProcessorLogger.ts b/packages/app/background-jobs-common/src/background-job-processor/BackgroundJobProcessorLogger.ts new file mode 100644 index 00000000..809ce36f --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/BackgroundJobProcessorLogger.ts @@ -0,0 +1,86 @@ +import type { Job } from 'bullmq' +import type { BaseLogger, Bindings, ChildLoggerOptions, Logger } from 'pino' +import type pino from 'pino' + +const hasMsgProperty = (obj: unknown): obj is { msg: string } => { + return typeof obj === 'object' && obj !== null && 'msg' in obj && typeof obj.msg === 'string' +} + +const hasMessageProperty = (obj: unknown): obj is { message: string } => { + return ( + typeof obj === 'object' && obj !== null && 'message' in obj && typeof obj.message === 'string' + ) +} + +export class BackgroundJobProcessorLogger implements BaseLogger { + private readonly logger: Logger + private readonly job: Job + + constructor(logger: Logger, job: Job) { + this.logger = logger + this.job = job + } + + get level(): pino.LevelWithSilentOrString { + return this.logger.level + } + + set level(level: pino.LevelWithSilentOrString) { + this.logger.level = level + } + + silent: pino.LogFn = (obj: unknown, msg?: string, ...args: unknown[]) => { + this.logger.silent(obj, msg, args) + // silent should not log on job + } + + trace: pino.LogFn = (obj: unknown, msg?: string, ...args: unknown[]) => { + this.logger.trace(obj, msg, args) + this.jobLog('trace', obj, msg) + } + + debug: pino.LogFn = (obj: unknown, msg?: string, ...args: unknown[]) => { + this.logger.debug(obj, msg, args) + this.jobLog('debug', obj, msg) + } + + info: pino.LogFn = (obj: unknown, msg?: string, ...args: unknown[]) => { + this.logger.info(obj, msg, args) + this.jobLog('info', obj, msg) + } + + warn: pino.LogFn = (obj: unknown, msg?: string, ...args: unknown[]) => { + this.logger.warn(obj, msg, args) + this.jobLog('warn', obj, msg) + } + + error: pino.LogFn = (obj: unknown, msg?: string, ...args: unknown[]) => { + this.logger.error(obj, msg, args) + this.jobLog('error', obj, msg) + } + + fatal: pino.LogFn = (obj: unknown, msg?: string, ...args: unknown[]) => { + this.logger.fatal(obj, msg, args) + this.jobLog('fatal', obj, msg) + } + + child(bindings: Bindings, options?: ChildLoggerOptions): BaseLogger { + return new BackgroundJobProcessorLogger(this.logger.child(bindings, options), this.job) + } + + private jobLog(level: pino.Level, obj: unknown, msg?: string) { + const levelValue = this.logger.levels.values[level] + if (levelValue < this.logger.levelVal) return + + let message: string | undefined + + if (typeof obj === 'string') message = obj + else if (hasMsgProperty(obj)) message = obj.msg + else if (hasMessageProperty(obj)) message = obj.message + else message = msg + + if (!message) return + + void this.job.log(`[${level}] ${message}`).catch(() => undefined) // in case of error just ignore + } +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/constants.ts b/packages/app/background-jobs-common/src/background-job-processor/constants.ts new file mode 100644 index 00000000..c02d42b6 --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/constants.ts @@ -0,0 +1,14 @@ +/** + * How many days we retain completed jobs + */ +export const RETENTION_COMPLETED_JOBS_IN_DAYS = 3 + +/** + * How many days we retain failed jobs + */ +export const RETENTION_FAILED_JOBS_IN_DAYS = 7 + +/** + * How many days we retain queue ids + */ +export const RETENTION_QUEUE_IDS_IN_DAYS = 14 diff --git a/packages/app/background-jobs-common/src/background-job-processor/index.ts b/packages/app/background-jobs-common/src/background-job-processor/index.ts new file mode 100644 index 00000000..61e75e26 --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/index.ts @@ -0,0 +1,4 @@ +export * from './types' +export * from './processors/AbstractBackgroundJobProcessor' +export * from './processors/FakeBackgroundJobProcessor' +export * from './processors/spy/types' diff --git a/packages/app/background-jobs-common/src/background-job-processor/processors/AbstractBackgroundJobProcessor.spec.ts b/packages/app/background-jobs-common/src/background-job-processor/processors/AbstractBackgroundJobProcessor.spec.ts new file mode 100644 index 00000000..47b1dd93 --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/processors/AbstractBackgroundJobProcessor.spec.ts @@ -0,0 +1,346 @@ +import { generateMonotonicUuid } from '@lokalise/id-utils' +import { waitAndRetry } from '@lokalise/node-core' +import { UnrecoverableError } from 'bullmq' +import { symbols } from 'pino' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +import { DependencyMocks, lastInfoSpy } from '../../../test/dependencyMocks' +import { TestFailingBackgroundJobProcessor } from '../../../test/processors/TestFailingBackgroundJobProcessor' +import { TestStalledBackgroundJobProcessor } from '../../../test/processors/TestStalledBackgroundJobProcessor' +import { RETENTION_QUEUE_IDS_IN_DAYS } from '../constants' +import { BackgroundJobProcessorDependencies } from '../types' +import { daysToMilliseconds } from '../utils' + +import { AbstractBackgroundJobProcessor } from './AbstractBackgroundJobProcessor' +import { FakeBackgroundJobProcessor } from './FakeBackgroundJobProcessor' + +type JobData = { + id: string + value: string +} + +const QUEUE_IDS_KEY = 'background-jobs-common:background-job:queues' + +describe('AbstractBackgroundJobProcessor', () => { + let mocks: DependencyMocks + let deps: BackgroundJobProcessorDependencies + + beforeEach(() => { + mocks = new DependencyMocks() + deps = mocks.create() + }) + + afterEach(async () => { + await mocks.dispose() + }) + + describe('getActiveQueueIds', () => { + beforeEach(async () => { + await deps.redis.del(QUEUE_IDS_KEY) + }) + + it('returns not expired elements on the set', async () => { + const retentionMs = daysToMilliseconds(RETENTION_QUEUE_IDS_IN_DAYS) + await deps.redis.zadd(QUEUE_IDS_KEY, Date.now() - retentionMs, 'expired') + await deps.redis.zadd(QUEUE_IDS_KEY, Date.now(), 'queue2') + await deps.redis.zadd(QUEUE_IDS_KEY, Date.now() - retentionMs + 100, 'queue1') + + const queues = await AbstractBackgroundJobProcessor.getActiveQueueIds(deps.redis) + expect(queues).toEqual(['queue1', 'queue2']) + }) + }) + + describe('start', () => { + beforeEach(async () => { + await deps.redis.del(QUEUE_IDS_KEY) + }) + + it('throws an error if queue id is not unique', async () => { + const job1 = new FakeBackgroundJobProcessor(deps, 'queue1') + const job2 = new FakeBackgroundJobProcessor(deps, 'queue2') + + await job1.start() + await job2.start() + await expect(new FakeBackgroundJobProcessor(deps, 'queue1').start()).rejects.toThrow( + /Queue id "queue1" is not unique/, + ) + + await job1.dispose() + await job2.dispose() + }) + + it('schedule throws error if called before processor started', async () => { + const processor = new FakeBackgroundJobProcessor(deps, 'queue1') + + await expect(processor.schedule({ id: 'test_id', value: 'test' })).rejects.toThrow( + 'Job queue "queue1" is not initialized. Please call "start" method before scheduling jobs.', + ) + }) + + it('scheduleBulk throws error if called before processor started', async () => { + const processor = new FakeBackgroundJobProcessor(deps, 'queue1') + + await expect(processor.scheduleBulk([{ id: 'test_id', value: 'test' }])).rejects.toThrow( + 'Job queue "queue1" is not initialized. Please call "start" method before scheduling jobs.', + ) + }) + + it('queue id is stored/updated on redis with current timestamp', async () => { + const processor = new FakeBackgroundJobProcessor(deps, 'queue1') + await processor.start() + + const today = new Date() + const [value, score] = await deps.redis.zrange(QUEUE_IDS_KEY, 0, -1, 'WITHSCORES') + expect(value).toBe('queue1') + // Comparing timestamps in seconds + const todaySeconds = Math.floor(today.getTime() / 1000) + const scoreSeconds = Math.floor(new Date(parseInt(score)).getTime() / 1000) + // max difference 1 to handle edge case of 0.1 - 1.0 + expect(scoreSeconds - todaySeconds).lessThanOrEqual(1) + + // disposing and restarting to check that timestamp is updated + await processor.dispose() + await processor.start() + + const [value2, score2] = await deps.redis.zrange(QUEUE_IDS_KEY, 0, -1, 'WITHSCORES') + expect(value2).toBe('queue1') + expect(new Date(parseInt(score))).not.toEqual(new Date(parseInt(score2))) + + await processor.dispose() + }) + }) + + describe('success', () => { + const QueueName = 'AbstractBackgroundJobProcessor_success' + let processor: FakeBackgroundJobProcessor + + beforeEach(async () => { + processor = new FakeBackgroundJobProcessor(deps, QueueName) + await processor.start() + }) + + afterEach(async () => { + await processor.dispose() + }) + + it('runs the job logging with autogenerated id', async () => { + const jobData = { id: generateMonotonicUuid(), value: 'test' } + const jobId = await processor.schedule(jobData) + + const job = await processor.spy.waitForJobWithId(jobId, 'completed') + expect(job.data).toMatchObject(jobData) + + expect(lastInfoSpy).toHaveBeenCalledTimes(2) + expect(lastInfoSpy.mock.calls[0]).toMatchObject([ + { + origin: 'FakeBackgroundJobProcessor', + jobId, + jobData, + }, + 'Started job FakeBackgroundJobProcessor', + [], + ]) + expect(lastInfoSpy.mock.calls[1]).toMatchObject([ + { + isSuccess: true, + jobId, + }, + 'Finished job FakeBackgroundJobProcessor', + [], + ]) + }) + + it('schedules and runs multiple jobs', async () => { + const scheduledJobIds = await processor.scheduleBulk([ + { id: generateMonotonicUuid(), value: 'first' }, + { id: generateMonotonicUuid(), value: 'second' }, + ]) + + expect(scheduledJobIds.length).toBe(2) + + const firstJob = await processor.spy.waitForJobWithId(scheduledJobIds[0], 'completed') + const secondJob = await processor.spy.waitForJobWithId(scheduledJobIds[1], 'completed') + + expect(firstJob.data.value).toBe('first') + expect(secondJob.data.value).toBe('second') + }) + + it('stops the worker on dispose', async () => { + // Given + const jobData = { id: generateMonotonicUuid(), value: 'test' } + + await processor.schedule(jobData) + const job = await processor.spy.waitForJob((data) => data.id === jobData.id, 'completed') + expect(job.data).toMatchObject(jobData) + + // When + await processor.dispose() + const logSpy = vi.spyOn(deps.logger, 'info') + + // Then + await processor.schedule({ id: generateMonotonicUuid(), value: 'test' }) + + // Further scheduled jobs are not executed + await waitAndRetry(() => logSpy.mock.calls.length > 0) + expect(logSpy).not.toHaveBeenCalled() + }) + }) + + describe('error', () => { + const QueueName = 'AbstractBackgroundJobProcessor_error' + let processor: TestFailingBackgroundJobProcessor + + beforeEach(async () => { + processor = new TestFailingBackgroundJobProcessor(deps, QueueName) + await processor.start() + }) + + afterEach(async () => { + await processor.dispose() + }) + + it('job is throwing normal errors', async () => { + const errors = [ + new Error('normal test error 1'), + new Error('normal test error 2'), + new Error('normal test error 3'), + ] + processor.errorsToThrowOnProcess = errors + const scheduledJobId = await processor.schedule( + { id: 'test_id', value: 'test' }, + { + attempts: 3, + delay: 0, + }, + ) + const job = await processor.spy.waitForJobWithId(scheduledJobId, 'failed') + + expect(processor.errorsOnProcess).length(1) + expect(job.attemptsMade).toBe(3) + expect(processor.errorsOnProcess[0]).toMatchObject(errors[2]) + + // @ts-expect-error accessing pino internals + expect(processor.lastLogger[symbols.chindingsSym]).toContain('"x-request-id"') + }) + + it('job throws unrecoverable error at the beginning', async () => { + const errors = [new UnrecoverableError('unrecoverable test error 1')] + processor.errorsToThrowOnProcess = errors + await processor.schedule( + { id: 'test_id', value: 'test' }, + { + attempts: 3, + delay: 0, + }, + ) + + const job = await processor.spy.waitForJob((data) => data.id === 'test_id', 'failed') + + expect(processor.errorsOnProcess).length(1) + expect(job.attemptsMade).toBe(1) + expect(processor.errorsOnProcess[0]).toMatchObject(errors[0]) + }) + + it('job throws unrecoverable error in the middle', async () => { + const errors = [ + new Error('normal test error 1'), + new UnrecoverableError('unrecoverable test error 2'), + ] + processor.errorsToThrowOnProcess = errors + await processor.schedule( + { id: 'test_id', value: 'test' }, + { + attempts: 3, + delay: 0, + }, + ) + + const job = await processor.spy.waitForJob((data) => data.id === 'test_id', 'failed') + + expect(processor.errorsOnProcess).length(1) + expect(job.attemptsMade).toBe(2) + expect(processor.errorsOnProcess[0]).toMatchObject(errors[1]) + }) + + it('error is triggered on failed hook', async () => { + const onFailedError = new Error('onFailed error') + processor.errorToThrowOnFailed = onFailedError + processor.errorsToThrowOnProcess = [new UnrecoverableError('unrecoverable error')] + + const reportSpy = vi.spyOn(deps.errorReporter, 'report') + + await processor.schedule( + { id: 'test_id', value: 'test' }, + { + attempts: 3, + delay: 0, + }, + ) + + const job = await processor.spy.waitForJob((data) => data.id === 'test_id', 'failed') + + expect(processor.errorsOnProcess).length(1) + expect(reportSpy).toHaveBeenCalledWith({ + error: onFailedError, + context: { + id: job.id, + errorJson: expect.stringContaining(onFailedError.message), + }, + }) + }) + }) + + describe('stalled', () => { + let processor: TestStalledBackgroundJobProcessor + + beforeEach(async () => { + processor = new TestStalledBackgroundJobProcessor(deps) + await processor.start() + }) + + afterEach(async () => { + await processor.dispose() + }) + + it('handling stalled errors', async () => { + const errorReporterSpy = vi.spyOn(deps.errorReporter, 'report') + const jobData = { id: generateMonotonicUuid() } + const jobId = await processor.schedule(jobData) + + await waitAndRetry(() => processor.onFailedErrors.length > 0, 100, 20) + expect(processor?.onFailedErrors).length(1) + + const onFailedCall = processor?.onFailedErrors[0] + expect(onFailedCall.error.message).toBe('job stalled more than allowable limit') + expect(onFailedCall.job.id).toBe(jobId) + expect(onFailedCall.job.data.id).toBe(jobData.id) + expect(onFailedCall.job.attemptsMade).toBe(0) + + expect(errorReporterSpy).toHaveBeenCalledWith({ + error: onFailedCall.error, + context: { + id: jobId, + errorJson: expect.stringContaining(onFailedCall.error.message), + }, + }) + // @ts-expect-error accessing pino internals + expect(processor.lastLogger[symbols.chindingsSym]).toContain('"x-request-id"') + }) + }) + + describe('spy', () => { + it('throws error when spy accessed in non-test mode', async () => { + const processor = new TestFailingBackgroundJobProcessor( + deps, + 'AbstractBackgroundJobProcessor_spy', + false, + ) + + expect(() => processor.spy).throws( + 'spy was not instantiated, it is only available on test mode. Please use `config.isTest` to enable it.', + ) + + await processor.dispose() + }) + }) +}) diff --git a/packages/app/background-jobs-common/src/background-job-processor/processors/AbstractBackgroundJobProcessor.ts b/packages/app/background-jobs-common/src/background-job-processor/processors/AbstractBackgroundJobProcessor.ts new file mode 100644 index 00000000..f8b92ee3 --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/processors/AbstractBackgroundJobProcessor.ts @@ -0,0 +1,333 @@ +import { generateMonotonicUuid } from '@lokalise/id-utils' +import type { ErrorReporter } from '@lokalise/node-core' +import { resolveGlobalErrorLogObject } from '@lokalise/node-core' +import { UnrecoverableError } from 'bullmq' +import type { Queue, Worker, WorkerOptions, JobsOptions, Job, QueueOptions } from 'bullmq' +import type Redis from 'ioredis' +import type { BaseLogger, Logger } from 'pino' +import pino from 'pino' +import { merge } from 'ts-deepmerge' + +import { BackgroundJobProcessorLogger } from '../BackgroundJobProcessorLogger' +import { + RETENTION_COMPLETED_JOBS_IN_DAYS, + RETENTION_FAILED_JOBS_IN_DAYS, + RETENTION_QUEUE_IDS_IN_DAYS, +} from '../constants' +import type { + BackgroundJobProcessorConfig, + BackgroundJobProcessorDependencies, + BullmqProcessor, + TransactionObservabilityManager, +} from '../types' +import { daysToMilliseconds, daysToSeconds, isStalledJobError, resolveJobId } from '../utils' + +import type { AbstractBullmqFactory } from './factories/AbstractBullmqFactory' +import { BackgroundJobProcessorSpy } from './spy/BackgroundJobProcessorSpy' +import type { BackgroundJobProcessorSpyInterface } from './spy/types' + +export interface RequestContext { + logger: BaseLogger + reqId: string +} + +/** + * Default config + * - Retry config: 3 retries with 30s of total amount of wait time between retries using + * exponential strategy https://docs.bullmq.io/guide/retrying-failing-jobs#built-in-backoff-strategies + * - Job retention: 3 days for completed jobs, 7 days for failed jobs + */ +const DEFAULT_JOB_CONFIG: JobsOptions = { + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000, + }, + removeOnComplete: { + age: daysToSeconds(RETENTION_COMPLETED_JOBS_IN_DAYS), + }, + removeOnFail: { + age: daysToSeconds(RETENTION_FAILED_JOBS_IN_DAYS), + }, +} + +const QUEUE_IDS_KEY = 'background-jobs-common:background-job:queues' + +const queueIdsSet = new Set() + +const DEFAULT_WORKER_OPTIONS = { + concurrency: 10, + maxStalledCount: 3, // same as default attempts by default + ttl: 60, +} as const satisfies Omit & { ttl: number } + +export abstract class AbstractBackgroundJobProcessor< + JobPayload extends object, + JobReturn = void, + JobType extends Job = Job, + QueueType extends Queue = Queue, + QueueOptionsType extends QueueOptions = QueueOptions, + WorkerType extends Worker = Worker, + WorkerOptionsType extends WorkerOptions = WorkerOptions, + ProcessorType extends BullmqProcessor = BullmqProcessor< + JobType, + JobPayload, + JobReturn + >, + JobOptionsType extends JobsOptions = JobsOptions, +> { + protected readonly logger: Logger + + private readonly redis: Redis + private readonly newRelicBackgroundTransactionManager: TransactionObservabilityManager + private readonly errorReporter: ErrorReporter + private readonly config: BackgroundJobProcessorConfig + + private queue?: QueueType + private worker?: WorkerType + protected _spy?: BackgroundJobProcessorSpy + private factory: AbstractBullmqFactory< + QueueType, + QueueOptionsType, + WorkerType, + WorkerOptionsType, + ProcessorType, + JobType, + JobPayload, + JobReturn + > + + protected constructor( + dependencies: BackgroundJobProcessorDependencies< + JobPayload, + JobReturn, + JobType, + QueueType, + QueueOptionsType, + WorkerType, + WorkerOptionsType, + ProcessorType + >, + config: BackgroundJobProcessorConfig, + ) { + this.config = config + this.factory = dependencies.bullmqFactory + this.redis = dependencies.redis + this.newRelicBackgroundTransactionManager = dependencies.transactionObservabilityManager + this.logger = dependencies.logger + this.errorReporter = dependencies.errorReporter + } + + public static async getActiveQueueIds(redis: Redis): Promise { + await redis.zremrangebyscore( + QUEUE_IDS_KEY, + '-inf', + Date.now() - daysToMilliseconds(RETENTION_QUEUE_IDS_IN_DAYS), + ) + const queueIds = await redis.zrange(QUEUE_IDS_KEY, 0, -1) + return queueIds.sort() + } + + public get spy(): BackgroundJobProcessorSpyInterface { + if (!this._spy) + throw new Error( + 'spy was not instantiated, it is only available on test mode. Please use `config.isTest` to enable it.', + ) + + return this._spy + } + + public async start(): Promise { + if (queueIdsSet.has(this.config.queueId)) + throw new Error(`Queue id "${this.config.queueId}" is not unique.`) + + queueIdsSet.add(this.config.queueId) + await this.redis.zadd(QUEUE_IDS_KEY, Date.now(), this.config.queueId) + + this.queue = this.factory.buildQueue(this.config.queueId, { + connection: this.redis, + ...this.config.queueOptions, + } as QueueOptionsType) + await this.queue.waitUntilReady() + + const mergedWorkerOptions = merge( + DEFAULT_WORKER_OPTIONS, + this.config.workerOptions, + ) as unknown as Omit + this.worker = this.factory.buildWorker( + this.config.queueId, + (async (job: JobType) => { + return await this.processInternal(job) + }) as ProcessorType, + { + ...mergedWorkerOptions, + connection: this.redis, + } as WorkerOptionsType, + ) + this.worker.on('failed', (job, error) => { + if (!job) return // Should not be possible with our current config, check 'failed' for more info + // @ts-expect-error + this.handleFailedEvent(job, error) + }) + + if (this.config.isTest) { + // unlike queue, the docs for worker state that this is only useful in tests + await this.worker.waitUntilReady() + this._spy = new BackgroundJobProcessorSpy() + } + } + + public async dispose(): Promise { + queueIdsSet.delete(this.config.queueId) + + try { + // On test forcing the worker to close to not wait for current job to finish + await this.worker?.close(this.config.isTest) + await this.queue?.close() + } catch { + // do nothing + } + } + + public async schedule(jobData: JobPayload, options?: JobOptionsType): Promise { + const jobIds = await this.scheduleBulk([jobData], options) + return jobIds[0] + } + + public async scheduleBulk(jobData: JobPayload[], options?: JobOptionsType): Promise { + const jobs = await this.initializedQueue.addBulk( + jobData.map((data) => ({ + name: this.constructor.name, + data, + opts: this.prepareJobOptions(options ?? ({} as JobOptionsType)), + })), + ) + + if (!jobs.every((job) => !!job.id)) { + // Practically unreachable, but we want to simplify the signature of the method and avoid + // stating that it could return undefined. + throw new Error('Some scheduled job IDs are undefined') + } + + return jobs.map((job) => job.id as string) + } + + private prepareJobOptions(options: JobOptionsType): JobOptionsType { + const preparedOptions: JobOptionsType = { + jobId: generateMonotonicUuid(), + ...DEFAULT_JOB_CONFIG, + ...options, + } + + if (this.config.isTest && typeof preparedOptions.backoff === 'object') { + preparedOptions.backoff.delay = 1 + preparedOptions.backoff.type = 'fixed' + preparedOptions.removeOnFail = true + preparedOptions.removeOnComplete = true + } + + return preparedOptions + } + + private async processInternal(job: JobType) { + const jobId = resolveJobId(job) + let isSuccess = false + const requestContext: RequestContext = { + logger: new BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job), + reqId: jobId, + } + + try { + this.newRelicBackgroundTransactionManager.start(job.name) + requestContext.logger.info( + { + origin: this.constructor.name, + jobId, + jobData: job.data, + }, + `Started job ${job.name}`, + ) + + const result = await this.process(job, requestContext) + isSuccess = true + + this._spy?.addJobProcessingResult(job, 'completed') + return result + } finally { + requestContext.logger.info({ isSuccess, jobId }, `Finished job ${job.name}`) + this.newRelicBackgroundTransactionManager.stop(job.name) + } + } + + private handleFailedEvent(job: JobType, error: Error) { + const jobId = resolveJobId(job) + const requestContext: RequestContext = { + logger: new BackgroundJobProcessorLogger(this.resolveExecutionLogger(jobId), job), + reqId: jobId, + } + + requestContext.logger.error(resolveGlobalErrorLogObject(error, jobId)) + this.errorReporter.report({ + error, + context: { + id: jobId, + errorJson: JSON.stringify(pino.stdSerializers.err(error)), + }, + }) + + if ( + error instanceof UnrecoverableError || + isStalledJobError(error) || + job.opts.attempts === job.attemptsMade + ) { + void this.internalOnFailed(job, error, requestContext).catch(() => undefined) // nothing to do in case of error + } + } + + private async internalOnFailed( + job: JobType, + error: Error, + requestContext: RequestContext, + ): Promise { + try { + await this.onFailed(job, error, requestContext) + } catch (error) { + requestContext.logger.error(resolveGlobalErrorLogObject(error, job.id)) + + if (error instanceof Error) { + this.errorReporter.report({ + error, + context: { + id: job.id, + errorJson: JSON.stringify(pino.stdSerializers.err(error)), + }, + }) + } + } + + this._spy?.addJobProcessingResult(job, 'failed') + } + + protected resolveExecutionLogger(jobId: string) { + return this.logger.child({ 'x-request-id': jobId }) + } + + private get initializedQueue(): QueueType { + if (!this.queue) + throw new Error( + `Job queue "${this.config.queueId}" is not initialized. Please call "start" method before scheduling jobs.`, + ) + + return this.queue + } + + protected abstract process( + job: Job, + requestContext: RequestContext, + ): Promise + protected abstract onFailed( + job: Job, + error: Error, + requestContext: RequestContext, + ): Promise +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/processors/FakeBackgroundJobProcessor.spec.ts b/packages/app/background-jobs-common/src/background-job-processor/processors/FakeBackgroundJobProcessor.spec.ts new file mode 100644 index 00000000..c0001b0f --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/processors/FakeBackgroundJobProcessor.spec.ts @@ -0,0 +1,42 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest' + +import { DependencyMocks } from '../../../test/dependencyMocks' +import { BackgroundJobProcessorDependencies } from '../types' + +import { FakeBackgroundJobProcessor } from './FakeBackgroundJobProcessor' + +type JobData = { + value: string +} + +// Adding test to deprecated methods +describe('FakeBackgroundJobProcessor', () => { + const QueueName = 'AbstractBackgroundJobProcessor_success' + let mocks: DependencyMocks + let deps: BackgroundJobProcessorDependencies + let processor: FakeBackgroundJobProcessor + + beforeEach(async () => { + mocks = new DependencyMocks() + deps = mocks.create() + processor = new FakeBackgroundJobProcessor(deps, QueueName) + await processor.start() + }) + + afterEach(async () => { + await processor.dispose() + await mocks.dispose() + }) + + it('process calls and clean works', async () => { + const data = { value: 'test' } + await processor.schedule(data) + + await processor.spy?.waitForJob((data) => data.value === 'test', 'completed') + expect(processor.processCalls).toHaveLength(1) + expect(processor.processCalls[0]).toEqual(data) + + processor.clean() + expect(processor.processCalls).toHaveLength(0) + }) +}) diff --git a/packages/app/background-jobs-common/src/background-job-processor/processors/FakeBackgroundJobProcessor.ts b/packages/app/background-jobs-common/src/background-job-processor/processors/FakeBackgroundJobProcessor.ts new file mode 100644 index 00000000..6bd020ec --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/processors/FakeBackgroundJobProcessor.ts @@ -0,0 +1,47 @@ +import type { Job } from 'bullmq' + +import type { BackgroundJobProcessorDependencies } from '../types' + +import { AbstractBackgroundJobProcessor } from './AbstractBackgroundJobProcessor' + +export class FakeBackgroundJobProcessor< + T extends object, +> extends AbstractBackgroundJobProcessor { + private _processCalls: T[] = [] + + constructor( + dependencies: BackgroundJobProcessorDependencies, + queueName: string, + isTest = true, + ) { + super(dependencies, { + queueId: queueName, + isTest, + workerOptions: { + concurrency: 1, + }, + }) + } + protected override process(job: Job): Promise { + this._processCalls.push(job.data) + return Promise.resolve(undefined) + } + + protected onFailed(_job: Job, _error: Error): Promise { + return Promise.resolve(undefined) + } + + /** + * @deprecated use job spy instead + */ + public get processCalls(): T[] { + return this._processCalls + } + + /** + * @deprecated use job spy instead + */ + public clean(): void { + this._processCalls = [] + } +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/processors/factories/AbstractBullmqFactory.ts b/packages/app/background-jobs-common/src/background-job-processor/processors/factories/AbstractBullmqFactory.ts new file mode 100644 index 00000000..232d670b --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/processors/factories/AbstractBullmqFactory.ts @@ -0,0 +1,21 @@ +import type { Job, Queue, QueueOptions, Worker, WorkerOptions } from 'bullmq' + +import type { BullmqProcessor } from '../../types' + +export abstract class AbstractBullmqFactory< + QueueType extends Queue, + QueueOptionsType extends QueueOptions, + WorkerType extends Worker, + WorkerOptionsType extends WorkerOptions, + ProcessorType extends BullmqProcessor, + JobType extends Job, + JobPayload extends object, + JobReturn, +> { + abstract buildQueue(queueId: string, options?: QueueOptionsType): QueueType + abstract buildWorker( + queueId: string, + processor?: ProcessorType, + options?: WorkerOptionsType, + ): WorkerType +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/processors/factories/CommonBullmqFactory.ts b/packages/app/background-jobs-common/src/background-job-processor/processors/factories/CommonBullmqFactory.ts new file mode 100644 index 00000000..9c837f21 --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/processors/factories/CommonBullmqFactory.ts @@ -0,0 +1,32 @@ +import type { Processor, QueueOptions, WorkerOptions, Job } from 'bullmq' +import { Queue, Worker } from 'bullmq' + +import type { BullmqProcessor } from '../../types' + +import type { AbstractBullmqFactory } from './AbstractBullmqFactory' + +export class CommonBullmqFactory + implements + AbstractBullmqFactory< + Queue, + QueueOptions, + Worker, + WorkerOptions, + BullmqProcessor, + Job, + JobPayload, + JobReturn + > +{ + buildQueue(queueId: string, options: QueueOptions): Queue { + return new Queue(queueId, options) + } + + buildWorker( + queueId: string, + processor?: string | URL | null | Processor, + options?: WorkerOptions, + ): Worker { + return new Worker(queueId, processor, options) + } +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/processors/spy/BackgroundJobProcessorSpy.spec.ts b/packages/app/background-jobs-common/src/background-job-processor/processors/spy/BackgroundJobProcessorSpy.spec.ts new file mode 100644 index 00000000..8b845be5 --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/processors/spy/BackgroundJobProcessorSpy.spec.ts @@ -0,0 +1,140 @@ +import { generateMonotonicUuid } from '@lokalise/id-utils' +import { Job } from 'bullmq' +import { beforeAll, beforeEach, describe, expect, it } from 'vitest' + +import { BackgroundJobProcessorSpy } from './BackgroundJobProcessorSpy' + +type JobData = { value: string } + +describe('BackgroundJobProcessorSpy', () => { + let spy: BackgroundJobProcessorSpy + + beforeAll(() => { + spy = new BackgroundJobProcessorSpy() + }) + + beforeEach(() => { + spy.clear() + }) + + describe('waitForJobWithId', () => { + it('throws error when id is not defined or empty', async () => { + await expect( + async () => await spy.waitForJobWithId(undefined, 'completed'), + ).rejects.toThrowError('Job id is not defined or empty') + await expect(async () => await spy.waitForJobWithId('', 'completed')).rejects.toThrowError( + 'Job id is not defined or empty', + ) + }) + + it('existing job is returned immediately', async () => { + const id = generateMonotonicUuid() + spy.addJobProcessingResult(createFakeJob({ value: 'test' }, id), 'completed') + + const result = await spy.waitForJobWithId(id, 'completed') + expect(result.id).toBe(id) + }) + + it('non existing job creates promise', async () => { + const id = generateMonotonicUuid() + const promise = spy.waitForJobWithId(id, 'completed') + await expect(isPromiseFinished(promise)).resolves.toBe(false) + + spy.addJobProcessingResult(createFakeJob({ value: 'test' }, id), 'completed') + + const result = await promise + expect(result.id).toBe(id) + }) + + it('promise is resolved when the job pass to the right state', async () => { + const id = generateMonotonicUuid() + const promise1 = spy.waitForJobWithId(id, 'completed') + const promise2 = spy.waitForJobWithId(id, 'failed') + await expect(isPromiseFinished(promise1)).resolves.toBe(false) + await expect(isPromiseFinished(promise2)).resolves.toBe(false) + + spy.addJobProcessingResult(createFakeJob({ value: 'test' }, id), 'failed') + + await expect(isPromiseFinished(promise1)).resolves.toBe(false) + await expect(isPromiseFinished(promise2)).resolves.toBe(true) + + const result = await promise2 + expect(result.id).toBe(id) + }) + }) + + describe('waitForJob', () => { + it('existing job is returned immediately', async () => { + spy.addJobProcessingResult(createFakeJob({ value: 'test_1' }), 'completed') + + const result = await spy.waitForJob((data) => data.value === 'test_1', 'completed') + expect(result.data.value).toBe('test_1') + }) + + it('non existing job creates promise', async () => { + const promise = spy.waitForJob((data) => data.value === 'test_2', 'completed') + await expect(isPromiseFinished(promise)).resolves.toBe(false) + + spy.addJobProcessingResult(createFakeJob({ value: 'test_2' }), 'completed') + + const result = await promise + expect(result.data.value).toBe('test_2') + }) + + it('promise is resolved when the job pass to the right state', async () => { + const promise1 = spy.waitForJob((data) => data.value === 'test_3', 'completed') + const promise2 = spy.waitForJob((data) => data.value === 'test_3', 'failed') + await expect(isPromiseFinished(promise1)).resolves.toBe(false) + await expect(isPromiseFinished(promise2)).resolves.toBe(false) + + spy.addJobProcessingResult(createFakeJob({ value: 'test_3' }), 'failed') + + await expect(isPromiseFinished(promise1)).resolves.toBe(false) + await expect(isPromiseFinished(promise2)).resolves.toBe(true) + + const result = await promise2 + expect(result.data.value).toBe('test_3') + }) + + it('promise is not resolved until the selector condition is met', async () => { + const promise = spy.waitForJob((data) => data.value === 'expected', 'completed') + await expect(isPromiseFinished(promise)).resolves.toBe(false) + + const job = createFakeJob({ value: 'wrong' }) + spy.addJobProcessingResult(job, 'completed') + await expect(isPromiseFinished(promise)).resolves.toBe(false) + + job.data.value = 'expected' + spy.addJobProcessingResult(job, 'completed') + const result = await promise + expect(result.id).toBe(job.id) + }) + }) + + describe('clean', () => { + it('clean works', async () => { + const promise = spy.waitForJob((data) => data.value === 'test', 'completed') + + spy.clear() + + spy.addJobProcessingResult(createFakeJob({ value: 'test' }), 'completed') + await expect(isPromiseFinished(promise)).resolves.toBe(false) + }) + }) +}) + +const isPromiseFinished = (promise: Promise): Promise => { + return Promise.race([ + new Promise((done) => setTimeout(() => done(false), 1000)), + promise.then(() => true).catch(() => true), + ]) +} + +const createFakeJob = (data: T, id?: string): Job => { + return { + id: id ?? generateMonotonicUuid(), + data, + attemptsMade: 0, + progress: 100, + } as Job +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/processors/spy/BackgroundJobProcessorSpy.ts b/packages/app/background-jobs-common/src/background-job-processor/processors/spy/BackgroundJobProcessorSpy.ts new file mode 100644 index 00000000..135f383b --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/processors/spy/BackgroundJobProcessorSpy.ts @@ -0,0 +1,106 @@ +import { removeNullish } from '@lokalise/node-core' +import type { Job } from 'bullmq' + +import type { JobFinalState } from '../../types' + +import type { BackgroundJobProcessorSpyInterface, JobSpyResult, JobDataSelector } from './types' + +type JobProcessingResult = { + job: JobSpyResult + state?: JobFinalState +} +type PromiseResolve = (value: T | PromiseLike) => void +type JobSelector = (job: Job) => boolean +type SpyPromise = { + selector: JobSelector + awaitedState: JobFinalState + resolve: PromiseResolve> + promise: Promise> +} + +export class BackgroundJobProcessorSpy + implements BackgroundJobProcessorSpyInterface +{ + private readonly jobProcessingResults: Map> + private promises: SpyPromise[] + + constructor() { + this.jobProcessingResults = new Map() + this.promises = [] + } + + clear(): void { + this.jobProcessingResults.clear() + this.promises = [] + } + + waitForJobWithId( + id: string | undefined, + awaitedState: JobFinalState, + ): Promise> { + if (!id) { + throw new Error('Job id is not defined or empty') + } + + const result = this.jobProcessingResults.get(id) + if (result && result.state === awaitedState) { + return Promise.resolve(result.job) + } + + return this.registerPromise((job) => job.id === id, awaitedState) + } + + waitForJob( + jobSelector: JobDataSelector, + awaitedState: JobFinalState, + ): Promise> { + const result = Array.from(this.jobProcessingResults.values()).find( + (spy) => jobSelector(spy.job.data) && spy.state === awaitedState, + ) + if (result) { + return Promise.resolve(result.job) + } + + return this.registerPromise((job) => jobSelector(job.data), awaitedState) + } + + private async registerPromise( + selector: JobSelector, + state: JobFinalState, + ): Promise> { + let resolve: PromiseResolve> + const promise = new Promise>((_resolve) => { + resolve = _resolve + }) + // @ts-ignore + this.promises.push({ selector, awaitedState: state, resolve, promise }) + + return promise + } + + /** + * Adds a job processing result and resolves any promises waiting for a matching job in the given final state. + * Note: This method is not exposed on BackgroundJobProcessorSpyInterface, it is intended to be + * a private package method + * + * @param job - The job to be added or updated. + * @param state - Final state of the job. + * @returns void + */ + addJobProcessingResult(job: Job, state: JobFinalState): void { + if (!job.id) return + this.jobProcessingResults.set(job.id, { job, state }) + + if (this.promises.length === 0) return + + const indexes = this.promises.map((promise, index) => { + if (promise.selector(job) && state === promise.awaitedState) { + promise.resolve(job) + return index + } + }) + for (const index of removeNullish(indexes)) { + this.promises.splice(index, 1) + } + } +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/processors/spy/types.ts b/packages/app/background-jobs-common/src/background-job-processor/processors/spy/types.ts new file mode 100644 index 00000000..f4066ace --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/processors/spy/types.ts @@ -0,0 +1,22 @@ +import type { Job } from 'bullmq' + +import type { JobFinalState } from '../../types' + +export type JobDataSelector = (jobData: JobData) => boolean +export type JobSpyResult = Pick< + Job, + 'data' | 'attemptsMade' | 'id' | 'progress' +> + +export interface BackgroundJobProcessorSpyInterface { + clear(): void + waitForJob( + jobSelector: JobDataSelector, + state: JobFinalState, + ): Promise> + + waitForJobWithId( + id: string | undefined, + awaitedState: JobFinalState, + ): Promise> +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/types.ts b/packages/app/background-jobs-common/src/background-job-processor/types.ts new file mode 100644 index 00000000..f3db7efe --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/types.ts @@ -0,0 +1,59 @@ +import type { ErrorReporter } from '@lokalise/node-core' +import type { Job, FinishedStatus, Queue, QueueOptions, Worker, WorkerOptions } from 'bullmq' +import type Redis from 'ioredis' +import type { Logger } from 'pino' + +import type { AbstractBullmqFactory } from './processors/factories/AbstractBullmqFactory' + +export type JobFinalState = FinishedStatus + +export type BackgroundJobProcessorConfig< + QueueOptionsType extends QueueOptions, + WorkerOptionsType extends WorkerOptions, +> = { + queueId: string + isTest: boolean + queueOptions?: Partial + workerOptions: Partial +} + +export type TransactionObservabilityManager = { + start: (transactionSpanId: string) => unknown + stop: (transactionSpanId: string) => unknown +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type BullmqProcessor, T = any, R = any, N extends string = string> = ( + job: J, + token?: string, +) => Promise + +export type BackgroundJobProcessorDependencies< + JobPayload extends object, + JobReturn = void, + JobType extends Job = Job, + QueueType extends Queue = Queue, + QueueOptionsType extends QueueOptions = QueueOptions, + WorkerType extends Worker = Worker, + WorkerOptionsType extends WorkerOptions = WorkerOptions, + ProcessorType extends BullmqProcessor = BullmqProcessor< + JobType, + JobPayload, + JobReturn + >, +> = { + redis: Redis + transactionObservabilityManager: TransactionObservabilityManager + logger: Logger + errorReporter: ErrorReporter + bullmqFactory: AbstractBullmqFactory< + QueueType, + QueueOptionsType, + WorkerType, + WorkerOptionsType, + ProcessorType, + JobType, + JobPayload, + JobReturn + > +} diff --git a/packages/app/background-jobs-common/src/background-job-processor/utils.ts b/packages/app/background-jobs-common/src/background-job-processor/utils.ts new file mode 100644 index 00000000..a09717f9 --- /dev/null +++ b/packages/app/background-jobs-common/src/background-job-processor/utils.ts @@ -0,0 +1,10 @@ +import type { Job } from 'bullmq' + +export const daysToSeconds = (days: number): number => days * 24 * 60 * 60 + +export const daysToMilliseconds = (days: number): number => daysToSeconds(days) * 1000 + +export const resolveJobId = (job?: Job): string => job?.id ?? 'unknown' + +export const isStalledJobError = (error: Error): boolean => + error.message === 'job stalled more than allowable limit' diff --git a/packages/app/background-jobs-common/src/index.ts b/packages/app/background-jobs-common/src/index.ts new file mode 100644 index 00000000..97449d5a --- /dev/null +++ b/packages/app/background-jobs-common/src/index.ts @@ -0,0 +1 @@ +export * from './background-job-processor' diff --git a/packages/app/background-jobs-common/test/dependencyMocks.spec.ts b/packages/app/background-jobs-common/test/dependencyMocks.spec.ts new file mode 100644 index 00000000..77b76612 --- /dev/null +++ b/packages/app/background-jobs-common/test/dependencyMocks.spec.ts @@ -0,0 +1,21 @@ +import type { Redis } from 'ioredis' +import { describe, beforeAll, afterAll, it, expect } from 'vitest' + +import { DependencyMocks } from './dependencyMocks' + +describe('DependencyMocks', () => { + let mocks: DependencyMocks + let redis: Redis + beforeAll(() => { + mocks = new DependencyMocks() + ;({ redis } = mocks.create()) + }) + afterAll(async () => { + await mocks.dispose() + }) + + it('should start redis server', async () => { + expect(redis).toBeDefined() + expect(await redis.ping()).toBe('PONG') + }) +}) diff --git a/packages/app/background-jobs-common/test/dependencyMocks.ts b/packages/app/background-jobs-common/test/dependencyMocks.ts new file mode 100644 index 00000000..3581c858 --- /dev/null +++ b/packages/app/background-jobs-common/test/dependencyMocks.ts @@ -0,0 +1,77 @@ +import { globalLogger } from '@lokalise/node-core' +import { Redis } from 'ioredis' +import { MockInstance, vi, vitest } from 'vitest' + +import type { BackgroundJobProcessorDependencies } from '../src' +import { CommonBullmqFactory } from '../src/background-job-processor/processors/factories/CommonBullmqFactory' + +const MAX_DB_INDEX = 16 // Redis supports up to 16 logical databases + +let db = 0 + +const testLogger = globalLogger +export let lastInfoSpy: MockInstance +export let lastErrorSpy: MockInstance + +export class DependencyMocks { + private client?: Redis + + create(): BackgroundJobProcessorDependencies { + // eslint-disable-next-line @typescript-eslint/unbound-method + const originalChildFn = testLogger.child + + const originalMethodSpy = vitest.spyOn(testLogger, 'child') + originalMethodSpy.mockImplementation((...args) => { + const childLogger = originalChildFn.apply(testLogger, args) + lastInfoSpy = vitest.spyOn(childLogger, 'info') + lastErrorSpy = vitest.spyOn(childLogger, 'error') + return childLogger + }) + + return { + redis: this.startRedis(), + bullmqFactory: new CommonBullmqFactory(), + transactionObservabilityManager: { + start: vi.fn(), + stop: vi.fn(), + } as any, + logger: testLogger, + errorReporter: { + report: vi.fn(), + } as any, + } + } + + async dispose(): Promise { + await this.client?.flushall('SYNC') + await this.client?.quit() + } + + private startRedis(): Redis { + // Increment DB to avoid duplicates/overlap. Each run should have its own DB. + db++ + const host = process.env.REDIS_HOST + const port = process.env.REDIS_PORT ? Number(process.env.REDIS_PORT) : undefined + const username = process.env.REDIS_USERNAME + const password = process.env.REDIS_PASSWORD + const connectTimeout = process.env.REDIS_CONNECT_TIMEOUT + ? parseInt(process.env.REDIS_CONNECT_TIMEOUT, 10) + : undefined + const commandTimeout = process.env.REDIS_COMMAND_TIMEOUT + ? parseInt(process.env.REDIS_COMMAND_TIMEOUT, 10) + : undefined + this.client = new Redis({ + host, + db: db % MAX_DB_INDEX, + port, + username, + password, + connectTimeout, + commandTimeout, + maxRetriesPerRequest: null, + enableReadyCheck: false, + }) + + return this.client + } +} diff --git a/packages/app/background-jobs-common/test/processors/TestFailingBackgroundJobProcessor.ts b/packages/app/background-jobs-common/test/processors/TestFailingBackgroundJobProcessor.ts new file mode 100644 index 00000000..0695a66e --- /dev/null +++ b/packages/app/background-jobs-common/test/processors/TestFailingBackgroundJobProcessor.ts @@ -0,0 +1,60 @@ +import { Job } from 'bullmq' +import { Logger } from 'pino' + +import { BackgroundJobProcessorDependencies, FakeBackgroundJobProcessor } from '../../src' + +type TestFailingBackgroundJobProcessorData = { + id?: string +} + +export class TestFailingBackgroundJobProcessor< + T extends TestFailingBackgroundJobProcessorData, +> extends FakeBackgroundJobProcessor { + private _errorsOnProcess: Error[] = [] + private _errorsToThrowOnProcess: Error[] = [] + private _errorToThrowOnFailed: Error | undefined + + public lastLogger: Logger | undefined + + constructor( + dependencies: BackgroundJobProcessorDependencies, + queueName: string, + isTest = true, + ) { + super(dependencies, queueName, isTest) + } + + protected override async process(job: Job): Promise { + await super.process(job) + const attempt = job.attemptsMade + if (this._errorsToThrowOnProcess.length >= attempt) { + throw this._errorsToThrowOnProcess[attempt] ?? new Error('Error has happened') + } + } + + protected resolveExecutionLogger(jobId: string): Logger { + const logger = super.resolveExecutionLogger(jobId) + this.lastLogger = logger + return logger + } + + set errorsToThrowOnProcess(errors: Error[]) { + this._errorsToThrowOnProcess = errors + } + + set errorToThrowOnFailed(error: Error | undefined) { + this._errorToThrowOnFailed = error + } + + get errorsOnProcess(): Error[] { + return this._errorsOnProcess + } + + protected override async onFailed(job: Job, error: Error) { + await super.onFailed(job, error) + this._errorsOnProcess.push(error) + if (this._errorToThrowOnFailed) { + throw this._errorToThrowOnFailed + } + } +} diff --git a/packages/app/background-jobs-common/test/processors/TestStalledBackgroundJobProcessor.ts b/packages/app/background-jobs-common/test/processors/TestStalledBackgroundJobProcessor.ts new file mode 100644 index 00000000..0a2712ae --- /dev/null +++ b/packages/app/background-jobs-common/test/processors/TestStalledBackgroundJobProcessor.ts @@ -0,0 +1,61 @@ +import { Job } from 'bullmq' +import { Logger } from 'pino' + +import { AbstractBackgroundJobProcessor, BackgroundJobProcessorDependencies } from '../../src' + +type Data = { + id?: string +} + +type OnFailedError = { + error: Error + job: Job +} + +export class TestStalledBackgroundJobProcessor extends AbstractBackgroundJobProcessor { + private _onFailedErrors: OnFailedError[] = [] + public lastLogger: Logger | undefined + + constructor(dependencies: BackgroundJobProcessorDependencies) { + super(dependencies, { + queueId: 'TestStalledBackgroundJobProcessor queue', + isTest: false, // We don't want to override job options for this processor + workerOptions: { + lockDuration: 1, + stalledInterval: 1, + }, + }) + } + + async schedule(jobData: Data): Promise { + return super.schedule(jobData, { + attempts: 1, + backoff: { type: 'fixed', delay: 1 }, + removeOnComplete: true, + removeOnFail: 1, // we should keep the job in the queue to test the stalled job behavior + }) + } + + protected override async process(): Promise { + return new Promise((resolve) => setTimeout(resolve, 1000)) + } + + protected resolveExecutionLogger(jobId: string): Logger { + const logger = super.resolveExecutionLogger(jobId) + this.lastLogger = logger + return logger + } + + protected override onFailed(job: Job, error: Error): Promise { + this._onFailedErrors.push({ job, error }) + return Promise.resolve() + } + + get onFailedErrors(): OnFailedError[] { + return this._onFailedErrors + } + + clean(): void { + this._onFailedErrors = [] + } +} diff --git a/packages/app/background-jobs-common/test/setup.ts b/packages/app/background-jobs-common/test/setup.ts new file mode 100644 index 00000000..94afe8b0 --- /dev/null +++ b/packages/app/background-jobs-common/test/setup.ts @@ -0,0 +1 @@ +process.loadEnvFile('./.env.test') diff --git a/packages/app/background-jobs-common/tsconfig.json b/packages/app/background-jobs-common/tsconfig.json new file mode 100644 index 00000000..b0199dfa --- /dev/null +++ b/packages/app/background-jobs-common/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + "outDir": "dist", + "module": "ESNext", + "target": "ES2022", + "lib": ["ES2022", "dom"], + "sourceMap": false, + "declaration": true, + "declarationMap": false, + "types": ["vitest/globals"], + "skipLibCheck": true, + "strict": true, + "moduleResolution": "node", + "noUnusedLocals": false, + "noUnusedParameters": false, + "noFallthroughCasesInSwitch": true, + "strictNullChecks": true, + "importHelpers": true, + "baseUrl": "./", + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true + }, + "include": ["src/**/*"], + "exclude": ["src/**/*.spec.ts", "src/**/*.test.ts"] +} diff --git a/packages/app/background-jobs-common/tsconfig.lint.json b/packages/app/background-jobs-common/tsconfig.lint.json new file mode 100644 index 00000000..e7532723 --- /dev/null +++ b/packages/app/background-jobs-common/tsconfig.lint.json @@ -0,0 +1,5 @@ +{ + "extends": ["./tsconfig.json"], + "include": ["src/**/*", "test/**/*", "vitest.config.mts"], + "exclude": [] +} diff --git a/packages/app/background-jobs-common/vitest.config.mts b/packages/app/background-jobs-common/vitest.config.mts new file mode 100644 index 00000000..b06c93f3 --- /dev/null +++ b/packages/app/background-jobs-common/vitest.config.mts @@ -0,0 +1,31 @@ +import { resolve } from 'path' + +import defineConfig from '@lokalise/package-vite-config/package' + +import packageJson from './package.json' + +/* eslint-disable import/no-default-export */ +export default defineConfig({ + entry: resolve(__dirname, 'src/index.ts'), + dependencies: Object.keys(packageJson.dependencies), + test: { + setupFiles: ['./test/setup.ts'], + hookTimeout: 60000, + restoreMocks: true, + poolOptions: { + threads: { + singleThread: true, + isolate: false, + }, + }, + coverage: { + all: false, + thresholds: { + lines: 98, + functions: 100, + branches: 90, + statements: 98, + }, + }, + }, +}) diff --git a/packages/app/error-utils/package.json b/packages/app/error-utils/package.json index e3b53712..dbd66039 100644 --- a/packages/app/error-utils/package.json +++ b/packages/app/error-utils/package.json @@ -26,7 +26,7 @@ "clean": "rimraf dist .eslintcache", "lint": "eslint --cache --max-warnings=0 && prettier --check --log-level warn src \"**/*.{json,md}\" && tsc --noEmit", "lint:fix": "eslint --fix && prettier --write src \"**/*.{json,md}\"", - "test": "vitest run --coverage", + "test:ci": "vitest run --coverage", "prepublishOnly": "npm run build", "package-version": "echo $npm_package_version" }, diff --git a/packages/app/events-common/package.json b/packages/app/events-common/package.json index 98e9eb0c..34dbd283 100644 --- a/packages/app/events-common/package.json +++ b/packages/app/events-common/package.json @@ -26,8 +26,7 @@ "clean": "rimraf dist .eslintcache", "lint": "eslint --cache --max-warnings=0 . && prettier --check --log-level warn src test \"**/*.{json,md}\" && tsc --noEmit", "lint:fix": "eslint . --fix && prettier --write src test \"**/*.{json,md}\"", - "test": "vitest run --coverage", - "pretest:ci": "cross-env NODE_ENV=test npm run build", + "test:ci": "vitest run --coverage", "prepublishOnly": "npm run build", "package-version": "echo $npm_package_version" }, diff --git a/packages/app/id-utils/package.json b/packages/app/id-utils/package.json index b9c75f76..3dc7bade 100644 --- a/packages/app/id-utils/package.json +++ b/packages/app/id-utils/package.json @@ -26,7 +26,7 @@ "clean": "rimraf dist .eslintcache", "lint": "eslint --cache --max-warnings=0 && prettier --check --log-level warn src \"**/*.{json,md}\" && tsc --noEmit", "lint:fix": "eslint --fix && prettier --write src \"**/*.{json,md}\"", - "test": "vitest run --coverage", + "test:ci": "vitest run --coverage", "prepublishOnly": "npm run build", "package-version": "echo $npm_package_version" }, diff --git a/packages/app/non-translatable-markup/package.json b/packages/app/non-translatable-markup/package.json index 7d4866e7..3a7ed1ed 100644 --- a/packages/app/non-translatable-markup/package.json +++ b/packages/app/non-translatable-markup/package.json @@ -29,7 +29,7 @@ "clean": "rimraf dist .eslintcache", "lint": "eslint --cache --max-warnings=0 . && prettier --check --log-level warn src test \"**/*.{json,md}\" && tsc --noEmit", "lint:fix": "eslint . --fix && prettier --write src test \"**/*.{json,md}\"", - "test": "vitest run --coverage", + "test:ci": "vitest run --coverage", "prepublishOnly": "npm run build", "package-version": "echo $npm_package_version" }, diff --git a/packages/app/supported-languages/package.json b/packages/app/supported-languages/package.json index 98a53a11..b4dba5f8 100644 --- a/packages/app/supported-languages/package.json +++ b/packages/app/supported-languages/package.json @@ -32,7 +32,7 @@ "clean": "rimraf dist .eslintcache", "lint": "eslint --cache --max-warnings=0 && prettier --check --log-level warn src \\\"**/*.{json,md}\\\" && tsc --noEmit", "lint:fix": "eslint --fix && prettier --write src \\\"**/*.{json,md}\\\"", - "test": "vitest run --coverage", + "test:ci": "vitest run --coverage", "prepublishOnly": "npm run build", "package-version": "echo $npm_package_version" }, diff --git a/packages/dev/package-vite-config/package.json b/packages/dev/package-vite-config/package.json index 303061ff..e56977fc 100644 --- a/packages/dev/package-vite-config/package.json +++ b/packages/dev/package-vite-config/package.json @@ -34,8 +34,8 @@ "package-version": "echo $npm_package_version" }, "dependencies": { - "vite": "^5.0.11", - "vite-plugin-dts": "^3.6.0", + "vite": "5.0.11", + "vite-plugin-dts": "3.6.0", "vitest": "^1.4.0" }, "devDependencies": {