This section will cover how to write tests for the moderation service using Jest. We will start by writing unit tests for the ContentGuardService
and ModerationsService
. Then, we will write end-to-end tests for the moderation service using the TicketsMSController
and ModerationsController
.
Note: Don't forget to have a look at the NestJS Testing documentation.
Jest uses CommonJS by default, but we can use ESM (ECMAScript Modules) by modifying the Jest and Typescript configuration files generated by Nx.
// apps/moderation/tsconfig.spec.json
{
"extends": "./tsconfig.json",
"compilerOptions": {
"outDir": "../../dist/out-tsc",
"moduleResolution": "node",
"module": "esnext",
"target": "es2022",
"emitDecoratorMetadata": true,
"allowJs": true,
"types": ["jest", "node"]
},
"include": [
"**/*.ts",
"**/*.mock.ts",
"**/*.spec.ts",
"**/*.e2e-spec.ts",
"**/*.d.ts",
"jest.config.ts",
"jest.setup.ts",
"jest.teardown.ts"
]
}
Note: The
module
option is set toesnext
andmoduleResolution
option is set toBundler
to enable ESM support.
// apps/moderation/jest.config.ts
export default {
displayName: "moderation",
globals: {},
testEnvironment: "node",
transform: {
"^.+\\.ts$": [
"ts-jest",
{
tsconfig: "<rootDir>/tsconfig.spec.json",
useESM: true,
},
],
},
moduleFileExtensions: ["ts", "mjs", "js", "html"],
extensionsToTreatAsEsm: [".ts"],
coverageDirectory: "../../coverage/apps/moderation",
preset: "../../jest.preset.js",
};
Note: The
useESM
option is set totrue
to enable ESM support in thets-jest
transformer. TheextensionsToTreatAsEsm
option is set to['.ts']
to treat.ts
files as ESM. ThemoduleFileExtensions
option is set to['ts', 'mjs', 'js', 'html']
to include.mjs
files.
Finally, before running the tests, we must set the --experimental-vm-modules
flag to enable ESM support in Node.js when using node < 18.19.
export NODE_OPTIONS="$NODE_OPTIONS --experimental-vm-modules"
Unit testing is the process of testing individual units or components of software. It verifies that each unit of the software performs as designed. They provide a strict, written contract that the piece of code must satisfy and should give you the confidence to make changes. When well thought out, unit tests can become highly valuable because they are (usually) the most stable, the fastest to run, and the easiest to write. In best-case scenarios, they can even improve the design of your code.
What makes a good unit test?
- Small: Unit tests should be small. They should test a single piece of functionality using a single scenario (Given-When-Then).
- Fast: Unit tests should be quick to run. They should not take more than a few seconds to run.
- Isolated: Unit tests should not depend on any external resources. They should not rely on a database, network, or other external resource.
- Repeatable: Unit tests should be repeatable in any environment. They should produce the same result every time they run.
Note: You can run the unit tests with the following command:
nx run moderation:test
As you can see in the snippet below, NestJS provides a TestingModule
that allows us to create a module for testing purposes by calling Test.createTestingModule
. This module is similar to the regular module but specifically designed for testing.
When the module is compiled by calling TestingModule.compile
, we can use the get
method to get an instance of the service we want to test. We can then use this instance to call the methods to test and make assertions about the results.
When the provider we want to test has dependencies, we can use the useValue
property to provide a mock implementation of the dependency. This approach allows us to control the behavior of the dependency, spy on the dependency, and ensure that the test is isolated from the behavior of the dependency.
This test suite includes tests for the matchesDictionary
and isFlagged
methods. The isFlagged
test uses a Jest spy to mock the post method of the HttpService
.
Solution
// apps/moderation/src/app/content-guard/content-guard.service.spec.ts
/* eslint-disable max-nested-callbacks */
/* eslint-disable max-lines-per-function */
import { beforeEach, describe, expect, it, jest } from "@jest/globals";
import { HttpService } from "@nestjs/axios";
import { Test } from "@nestjs/testing";
import { AxiosResponse } from "axios";
import { of } from "rxjs";
import { ContentGuardModuleOptions } from "./content-guard.interfaces";
import {
ContentGuardService,
IOpenAIModerationResponse,
} from "./content-guard.service";
describe("ContentGuardService", () => {
let service: ContentGuardService;
let httpService: HttpService;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
ContentGuardService,
{
provide: HttpService,
useValue: {
post: jest.fn(),
},
},
{
provide: ContentGuardModuleOptions,
useValue: {
dictionary: { en: ["test"] },
openAIApiKey: "test-key",
},
},
],
}).compile();
service = module.get(ContentGuardService);
httpService = module.get(HttpService);
});
it("should be defined", () => {
expect(service).toBeDefined();
});
describe("matchesDictionary", () => {
it("when a value is present in the dictionary it should be a match", () => {
expect(service.matchesDictionary("test")).toBe(true);
});
it("when a value is missing from the dictionary it should NOT be a match", () => {
expect(service.matchesDictionary("no-match")).toBe(false);
});
});
describe("isFlagged", () => {
it("it should return first result from OpenAI API response", async () => {
const mockResponse: IOpenAIModerationResponse = {
id: "modr-test",
model: "text-moderation-stable",
results: [
{
flagged: true,
categories: {
sexual: false,
hate: false,
harassment: false,
"self-harm": false,
"sexual/minors": false,
"hate/threatening": false,
"violence/graphic": false,
"self-harm/intent": false,
"self-harm/instructions": false,
"harassment/threatening": false,
violence: false,
},
category_scores: {
sexual: 0,
hate: 0,
harassment: 0,
"self-harm": 0,
"sexual/minors": 0,
"hate/threatening": 0,
"violence/graphic": 0,
"self-harm/intent": 0,
"self-harm/instructions": 0,
"harassment/threatening": 0,
violence: 0,
},
},
],
};
jest.spyOn(httpService, "post").mockImplementationOnce(() =>
of({
data: mockResponse,
status: 200,
statusText: "OK",
headers: {},
config: {},
} as AxiosResponse<IOpenAIModerationResponse>),
);
const input = "test-content";
const result = await service.isFlagged(input);
expect(result).toEqual(mockResponse.results[0]);
expect(httpService.post).toHaveBeenCalledWith(
"https://api.openai.com/v1/moderations",
{
model: "text-moderation-stable",
input,
},
{
headers: {
"Content-Type": "application/json",
Authorization: expect.any(String),
},
},
);
});
});
});
Note:
- The
jest.fn()
method creates a mock function. We can use this mock function to spy on the behavior of theHttpService
and ensure that thepost
method is called with the correct arguments.- The
TestingModule.get
method is used to get an instance of theContentGuardService
andHttpService
from the module and use them to call the methods to test and make assertions about the results.- It is a NestJS convention to store the unit tests at the same level as the service being tested.
The ModerationsService
is more complex to test because it has many dependencies and uses events and queues, so we will use the auto-mocking feature of NestJS to mock the dependencies instead of declaring custom providers using useValue: Mocked<T>
.
Given that the InjectionToken
is a string, when you use createMock
from @golevelup/ts-jest
or ModuleMocker
from jest-mock
, you must still guide the helper function to pick the correct type for the mock. It is easier to accomplish with createMock
because it can infer the value from the type passed as a generic parameter.
yarn add @golevelup/ts-jest
Solution
// apps/moderation/src/app/moderations/moderations.service.spec.ts
/* eslint-disable max-lines-per-function */
/* eslint-disable max-nested-callbacks */
import { OryRelationshipsService } from "@getlarge/keto-client-wrapper";
import { createMock, DeepMocked } from "@golevelup/ts-jest";
import { beforeEach, describe, it, jest } from "@jest/globals";
import { getQueueToken } from "@nestjs/bullmq";
import { CACHE_MANAGER } from "@nestjs/cache-manager";
import { NotFoundException } from "@nestjs/common";
import { EventEmitter2 } from "@nestjs/event-emitter";
import { getModelToken } from "@nestjs/mongoose";
import { Test } from "@nestjs/testing";
import { AcceptableError } from "@ticketing/shared/errors";
import {
Moderation,
ModerationStatus,
ModerationTicket,
TicketStatus,
} from "@ticketing/shared/models";
import { Queue } from "bullmq";
import { Cache } from "cache-manager";
import { ClientSession, Document, Model, Types } from "mongoose";
import {
TICKET_APPROVED_EVENT,
TICKET_REJECTED_EVENT,
TicketCreatedEvent,
} from "../shared/events";
import { QueueNames } from "../shared/queues";
import { ModerationsService } from "./moderations.service";
import { Moderation as ModerationSchema, ModerationDocument } from "./schemas";
// for some reason the polyfill is missing when running with Jest
Object.defineProperty(Symbol, "asyncDispose", {
value: Symbol("Symbol.asyncDispose"),
});
describe("ModerationsService", () => {
let service: ModerationsService;
let moderationModel: DeepMocked<Model<ModerationDocument>>;
let cacheManager: DeepMocked<Cache>;
let moderationQueue: DeepMocked<Queue>;
let eventEmitter: DeepMocked<EventEmitter2>;
let oryRelationshipsService: DeepMocked<OryRelationshipsService>;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [ModerationsService],
})
.useMocker((token) => {
if (typeof token === "string" || typeof token === "symbol") {
switch (token) {
case getModelToken(Moderation.name):
return createMock<Model<ModerationDocument>>();
case getQueueToken(QueueNames.MODERATE_TICKET):
return createMock<Queue>();
case CACHE_MANAGER:
return createMock<Cache>();
default:
return null;
}
}
if (typeof token === "function") {
return createMock<typeof token>();
}
})
.compile();
service = module.get(ModerationsService);
moderationModel = module.get(getModelToken(Moderation.name));
moderationQueue = module.get(getQueueToken(QueueNames.MODERATE_TICKET));
eventEmitter = module.get(EventEmitter2);
oryRelationshipsService = module.get(OryRelationshipsService);
cacheManager = module.get(CACHE_MANAGER);
});
afterEach(() => {
jest.clearAllMocks();
});
it("should be defined", () => {
expect(service).toBeDefined();
expect(moderationModel).toBeDefined();
expect(moderationQueue).toBeDefined();
expect(eventEmitter).toBeDefined();
expect(oryRelationshipsService).toBeDefined();
expect(cacheManager).toBeDefined();
});
describe("find", () => {
it("should return a list of moderations", async () => {
const result: Moderation[] = [];
moderationModel.find.mockResolvedValueOnce(result);
//
expect(await service.find()).toEqual(result);
});
});
describe("findById", () => {
it("should return a moderation if it exists", async () => {
const result = {
id: "1",
status: ModerationStatus.Approved,
};
cacheManager.wrap.mockResolvedValueOnce(result);
//
expect(await service.findById("1")).toEqual(result);
});
it("should throw an error if the moderation does not exist", async () => {
cacheManager.wrap.mockRejectedValueOnce(new NotFoundException());
//
await expect(service.findById("1")).rejects.toThrow(NotFoundException);
});
});
describe("updateById", () => {
it("should update a moderation and return the updated moderation", async () => {
const expectedStatus = ModerationStatus.Approved;
const result = { ...createMock<Moderation>() };
result.id = "1";
result.status = expectedStatus;
const doc = createMock<ModerationSchema & Document>();
doc.save.mockResolvedValueOnce(doc);
doc.toJSON.mockReturnValueOnce(result);
moderationModel.findOne.mockResolvedValueOnce(doc);
//
expect(await service.updateById("1", { status: expectedStatus })).toEqual(
result,
);
expect(moderationModel.findOne).toHaveBeenCalledWith({ _id: "1" });
expect(doc.set).toHaveBeenCalledWith({
status: ModerationStatus.Approved,
});
expect(doc.save).toHaveBeenCalled();
});
});
describe("approveById", () => {
it("should approve a moderation and return the approved moderation", async () => {
const result = { ...createMock<Moderation>() };
result.id = "1";
result.status = ModerationStatus.Approved;
jest.spyOn(service, "findById").mockResolvedValueOnce(result);
eventEmitter.emitAsync.mockResolvedValueOnce(undefined);
//
expect(await service.approveById("1")).toEqual(result);
expect(eventEmitter.emitAsync).toHaveBeenCalledWith(
TICKET_APPROVED_EVENT,
{
moderation: result,
ticket: { ...result.ticket, status: TicketStatus.Approved },
ctx: {},
},
);
});
});
describe("rejectById", () => {
it("should reject a moderation and return the rejected moderation", async () => {
const rejectionReason = "reason";
const result = { ...createMock<Moderation>() };
result.id = "1";
result.status = ModerationStatus.Rejected;
result.rejectionReason = rejectionReason;
jest.spyOn(service, "findById").mockResolvedValueOnce(result);
eventEmitter.emitAsync.mockResolvedValueOnce(undefined);
//
expect(await service.rejectById("1", rejectionReason)).toEqual(result);
expect(eventEmitter.emitAsync).toHaveBeenCalledWith(
TICKET_REJECTED_EVENT,
{
moderation: result,
ticket: { ...result.ticket, status: TicketStatus.Rejected },
ctx: {},
},
);
});
});
describe("onTicketCreated", () => {
it("should throw when a moderation is already pending", async () => {
const ticketCreationEvent = {
...createMock<TicketCreatedEvent>(),
ticket: { ...createMock<ModerationTicket>(), id: "1" },
};
const existingModeration = createMock<ModerationSchema & Document>();
const mockedSession = createMock<ClientSession>();
mockedSession.withTransaction.mockImplementationOnce(async (fn) => {
await fn(mockedSession);
return Promise.resolve(existingModeration);
});
moderationModel.startSession.mockResolvedValueOnce(mockedSession);
moderationModel
.findOne({
"ticket.$id": ticketCreationEvent.ticket.id,
})
.session.mockResolvedValueOnce(existingModeration);
//
await expect(
service.onTicketCreated(ticketCreationEvent),
).rejects.toThrow(AcceptableError);
expect(moderationModel.startSession).toHaveBeenCalled();
expect(moderationModel.findOne).toHaveBeenCalledWith({
"ticket.$id": ticketCreationEvent.ticket.id,
});
expect(
moderationModel.findOne({ "ticket.$id": ticketCreationEvent.ticket.id })
.session,
).toHaveBeenCalledWith(mockedSession);
});
it("should create a moderation, relations with the admin group and a background job inside a Mongo transaction", async () => {
const ticketCreationEvent = {
...createMock<TicketCreatedEvent>(),
ticket: {
...createMock<ModerationTicket>(),
id: new Types.ObjectId().toHexString(),
},
};
const createdModerationDoc = createMock<ModerationSchema & Document>();
const createdModeration = { ...createMock<Moderation>() };
createdModeration.id = "1";
const mockedSession = createMock<ClientSession>();
mockedSession.withTransaction.mockImplementationOnce(async (fn) => {
await fn(mockedSession);
return Promise.resolve(createdModerationDoc);
});
moderationModel.startSession.mockResolvedValueOnce(mockedSession);
moderationModel
.findOne({
"ticket.$id": ticketCreationEvent.ticket.id,
})
.session.mockResolvedValueOnce(undefined);
moderationModel.create.mockResolvedValueOnce([
createdModerationDoc,
] as never);
createdModerationDoc.toJSON.mockReturnValueOnce(createdModeration);
//
await service.onTicketCreated(ticketCreationEvent);
expect(moderationModel.create).toHaveBeenCalledWith(
[
{
ticket: Types.ObjectId.createFromHexString(
ticketCreationEvent.ticket.id,
),
status: ModerationStatus.Pending,
},
],
{ session: mockedSession },
);
expect(oryRelationshipsService.createRelationship).toHaveBeenCalled();
expect(moderationQueue.add).toHaveBeenCalledWith(
"moderate-ticket",
{
ticket: ticketCreationEvent.ticket,
ctx: ticketCreationEvent.ctx,
moderation: createdModeration,
},
{
attempts: 2,
delay: 1000,
jobId: createdModeration.id,
removeOnComplete: true,
removeOnFail: true,
},
);
});
});
});
Note:
- Pay close attention to the
useMocker
method used in thebeforeEach
block. This method allows us to quickly provide a custom mock implementation for the dependencies of theModerationsService
.- The
createMock
method is used to create a mock instance of theModel
,Queue
,Cache
,EventEmitter2
, andOryRelationshipsService
classes. It is also used in our tests to arrange the behavior of the mocks and make assertions about the results.- The
jest.clearAllMocks
method is used in theafterEach
block to clear all the mock calls between tests.
End-to-end testing aims to verify that interactions between components behave and work together as intended. It is a critical part of the testing process because it ensures the application works as expected in a real-world scenario.
Narrow-scoped end-to-end tests focus on a single module, while broad-scoped end-to-end tests focus on the entire application. In this section, we will focus on narrow end-to-end tests.
In a NestJS application, end-to-end tests allow us to test all the implicit logic of the application, including the routing, middlewares, guards, interceptors, pipes, and error handlers. They also allow us to test the application in a near-production environment, including the database, network, and other external resources (to a certain extent).
It comes with a few challenges, though:
- Complexity: End-to-end tests are more complex than unit tests because all the application components must work together.
- Speed: End-to-end tests are slower than unit tests because they require the entire application to run.
- Fragility: End-to-end tests are more fragile than unit tests because they depend on the entire application being in a specific state.
What makes an excellent end-to-end test?
- Realistic: End-to-end tests should be as realistic as possible. They should simulate real-world scenarios and use real-world data.
- Isolated: End-to-end tests should be isolated from each other. They should not depend on the state of the previous tests.
- Repeatable: End-to-end tests should be repeatable in any environment. They should produce the same result every time they run.
- Stable: End-to-end tests should be stable. They should not fail randomly or produce inconsistent results.
Note: To run the end-to-end tests, you can use the following command:
nx run moderation:e2e
The end-to-end tests will have their own Jest configuration file.
// apps/moderation/jest-e2e.config.ts
export default {
displayName: "moderation-e2e",
testEnvironment: "node",
transform: {
"^.+\\.ts$": [
"ts-jest",
{
tsconfig: "<rootDir>/tsconfig.spec.json",
useESM: true,
},
],
},
globalSetup: "<rootDir>/jest.setup.ts",
globalTeardown: "<rootDir>/jest.teardown.ts",
moduleFileExtensions: ["ts", "mjs", "js", "html", "node"],
extensionsToTreatAsEsm: [".ts"],
collectCoverageFrom: ["./src/**/*.(t|j)s"],
coverageDirectory: "../../coverage/apps/moderation-e2e",
testMatch: ["**/+(*.)+(e2e-spec|test).+(ts|js)?(x)"],
preset: "../../jest.preset.js",
};
A setup file set up the environment for the end-to-end tests. It generates the configuration files for the Ory containers and cleans up the database before running the tests.
Solution
// apps/moderation/jest.setup.ts
import { execSync } from "node:child_process";
// eslint-disable-next-line @nx/enforce-module-boundaries
import setup from "../../tools/test/jest.mongo.setup";
const envPath = "apps/moderation/.env.test";
const cwd = process.cwd();
export default async (): Promise<void> => {
await setup(envPath);
execSync(
"npx ts-node --project tools/tsconfig.json tools/ory/generate-config.ts keto -e .env.test",
{ cwd, stdio: "ignore" },
);
execSync("docker compose restart keto", { cwd, stdio: "ignore" });
execSync(
"npx ts-node --project tools/tsconfig.json tools/ory/generate-config.ts kratos -e .env.test",
{ cwd, stdio: "ignore" },
);
execSync("docker compose restart kratos", { cwd, stdio: "ignore" });
};
Note: In order for Ory to work properly, we need to generate the configuration files for the Ory containers and restart them before running the end-to-end tests. Essentially, we remove the need for email confirmation and makes the storage in-memory for the tests.
A teardown file cleans up the environment after the end-to-end tests have run.
Solution
// apps/moderation/jest.teardown.ts
import mongoose from "mongoose";
import { execSync } from "node:child_process";
const cwd = process.cwd();
export default async (): Promise<void> => {
await mongoose.connection.close();
execSync(
"npx ts-node --project tools/tsconfig.json tools/ory/generate-config.ts keto -e .env",
{ cwd, stdio: "ignore" },
);
execSync("docker compose restart keto", { cwd, stdio: "ignore" });
execSync(
"npx ts-node --project tools/tsconfig.json tools/ory/generate-config.ts kratos -e .env",
{ cwd, stdio: "ignore" },
);
execSync("docker compose restart kratos", { cwd, stdio: "ignore" });
};
A new Nx target, e2e, is created to run the end-to-end tests.
// apps/moderation/project.json
{
"name": "moderation",
"$schema": "../../node_modules/nx/schemas/project-schema.json",
"sourceRoot": "apps/moderation/src",
"projectType": "application",
"targets": {
"build": {
"executor": "@nx/webpack:webpack",
"outputs": ["{options.outputPath}"],
"defaultConfiguration": "production",
"options": {
"outputPath": "dist/apps/moderation",
"outputFileName": "main.mjs",
"main": "apps/moderation/src/main.ts",
"tsConfig": "apps/moderation/tsconfig.app.json",
"assets": ["apps/moderation/src/assets"],
"generatePackageJson": true,
"target": "node",
"compiler": "tsc",
"webpackConfig": "apps/moderation/webpack.config.cjs",
"isolatedConfig": true
},
"configurations": {
"development": {},
"production": {}
}
},
"serve": {
"executor": "@nx/js:node",
"defaultConfiguration": "local",
"options": {
"buildTarget": "moderation:build"
},
"configurations": {
"development": {
"buildTarget": "moderation:build:development"
},
"production": {
"buildTarget": "moderation:build:production"
},
"local": {
"buildTarget": "moderation:build:development"
}
}
},
"lint": {
"executor": "@nx/eslint:lint",
"outputs": ["{options.outputFile}"],
"options": {
"lintFilePatterns": ["apps/moderation/**/*.ts"]
}
},
"test": {
"executor": "@nx/jest:jest",
"outputs": ["{workspaceRoot}/coverage/{projectRoot}"],
"options": {
"jestConfig": "apps/moderation/jest.config.ts"
}
},
"e2e": {
"executor": "@nx/jest:jest",
"outputs": ["{workspaceRoot}/coverage/{projectRoot}-e2e"],
"options": {
"jestConfig": "apps/moderation/jest-e2e.config.ts"
}
},
"dotenv-push": {
"executor": "nx:run-commands",
"options": {
"commands": ["cd apps/moderation && dotenv-vault push"]
},
"cwd": ".",
"parallel": false
},
"dotenv-pull": {
"executor": "nx:run-commands",
"options": {
"commands": ["node tools/utils/dotenv-pull.js -p moderation -v"]
},
"parallel": false,
"cwd": "."
},
"dotenv-build": {
"executor": "nx:run-commands",
"options": {
"commands": ["cd apps/moderation && dotenv-vault build"]
},
"cwd": ".",
"parallel": false
},
"dotenv-keys": {
"executor": "nx:run-commands",
"options": {
"commands": ["cd apps/moderation && dotenv-vault keys"]
},
"cwd": ".",
"parallel": false
}
},
"tags": ["scope:moderation", "type:app", "platform:server"]
}
Finally, let's declare some dedicated environment files containing variables for the end-to-end tests to configure the application and the Ory containers.
# apps/moderation/.env.test
# test@v1
MONGODB_URI=mongodb://localhost:27017/moderation_test
NODE_ENV=test
OPENAI_API_KEY=sk-xxxxx
ORY_HYDRA_ADMIN_URL=http://localhost:4434
ORY_HYDRA_PUBLIC_URL=http://localhost:4433
ORY_KETO_ADMIN_URL=http://localhost:4467
ORY_KETO_PUBLIC_URL=http://localhost:4466
ORY_KRATOS_ADMIN_URL=http://localhost:4434
ORY_KRATOS_PUBLIC_URL=http://localhost:4433
PORT=3090
REDIS_URL=redis://localhost:6379
RMQ_URL=amqp://localhost:5672
RMQ_MANAGEMENT_API_URL=http://localhost:15672/api
SERVER_URL=http://localhost:3090
# .env.test
PROXY_SERVER_URLS=http://localhost:8080
FRONTEND_URL=http://localhost
FRONTEND=host.docker.internal
FRONTEND_PORT=4200
AUTH_SERVICE=host.docker.internal
AUTH_SERVICE_PORT=3000
MODERATIONS_SERVICE=host.docker.internal
MODERATIONS_SERVICE_PORT=3090
ORDERS_SERVICE=host.docker.internal
ORDERS_SERVICE_PORT=3020
PAYMENTS_SERVICE=host.docker.internal
PAYMENTS_SERVICE_PORT=3040
TICKETS_SERVICE=host.docker.internal
TICKETS_SERVICE_PORT=3010
DOMAIN=localhost
CONNECT_SRC="http://localhost:4455 http://localhost:4433 http://localhost:4000 http://localhost:8080"
DEFAULT_SRC="http://localhost:4455 http://localhost:4433 http://localhost:4000 http://localhost:8080"
MEDIA_SRC=""
SCRIPT_SRC="'unsafe-inline'"
STYLE_SRC=""
STYLE_SRC_ELEM=""
STYLE_SRC_ATTR=""
FONT_SRC="data:"
FRAME_SRC="http://localhost:4455 http://localhost:4433 http://localhost:4000"
IMG_SRC=""
FORM_ACTION="http://localhost:8080"
# ORY
log_level="trace"
# ORY KRATOS
kratos_dsn="memory"
identity_schemas_default="file:///etc/config/kratos/identity.schema.test.json"
selfservice_default_browser_return_url="http://127.0.0.1:8080/"
selfservice_allowed_return_urls="http://127.0.0.1:8080, http://127.0.0.1:4455"
selfservice_flows_ui_base_url="http://127.0.0.1:4455"
selfservice_flows_errors_ui_url="http://127.0.0.1:4455/error"
selfservice_flows_settings_ui_url="http://127.0.0.1:4455/settings"
selfservice_flows_login_ui_url="http://127.0.0.1:4455/login"
selfservice_flows_registration_ui_url="http://127.0.0.1:4455/register"
selfservice_flows_recovery_ui_url="http://127.0.0.1:4455/recovery"
selfservice_flows_verification_ui_url="http://127.0.0.1:4455/verification"
selfservice_flows_login_after_hook_config_url="http://host.docker.internal:8080/api/users/on-sign-in"
selfservice_flows_login_after_hook_config_auth_config_value="unsecure_api_key"
selfservice_flows_login_after_hook_config_can_interrupt="false"
selfservice_flows_login_after_hook_config_response_ignore="true"
selfservice_flows_login_after_hook_config_response_parse="false"
selfservice_flows_registration_after_hook_config_url="http://host.docker.internal:8080/api/users/on-sign-up"
selfservice_flows_registration_after_hook_config_auth_config_value="unsecure_api_key"
selfservice_flows_registration_after_hook_config_can_interrupt="false"
selfservice_flows_registration_after_hook_config_response_ignore="true"
selfservice_flows_registration_after_hook_config_response_parse="false"
secrets_cookie="cookie_secret_not_good_not_secure"
secrets_cipher="32-LONG-SECRET-NOT-SECURE-AT-ALL"
serve_admin_base_url="http://kratos:4434/"
serve_public_base_url="http://127.0.0.1:4433/"
serve_public_cors_enabled="true"
serve_public_cors_allowed_origins="http://127.0.0.1:4433, http://127.0.0.1:4455, http://127.0.0.1:8080"
# ORY KETO
keto_dsn="memory"
This test suite will focus on reproducing interactions between the tickets
and the moderation
micro-services. We will declare a temporary queue in RabbitMQ and use the amqplib
library to create a producer to send messages to the queue that the TicketsMSController
will consume.
This time, we will use the createNestApplication
and createNestMicroservice
methods from the TestingModule
to create the application and the microservice.
We will assert the following scenarios:
- When an invalid payload comes from the
tickets
micro-service, themoderations
service should reply with anAcceptableError
and nack the message. - When a valid payload from the
tickets
micro-service is received, themoderations
service should create a moderation job, send an internal event, and ack the message.
Solution
// apps/moderation/test/tickets-ms.controller.e2e-spec.ts
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable max-lines-per-function */
import { INestMicroservice } from "@nestjs/common";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { LazyModuleLoader } from "@nestjs/core";
import { EventEmitter2, EventEmitterModule } from "@nestjs/event-emitter";
import { CustomStrategy } from "@nestjs/microservices";
import { getModelToken, MongooseModule } from "@nestjs/mongoose";
import {
FastifyAdapter,
NestFastifyApplication,
} from "@nestjs/platform-fastify";
import { Test, TestingModule } from "@nestjs/testing";
import { AmqpClient, AmqpServer } from "@s1seven/nestjs-tools-amqp-transport";
import { loadEnv, validate } from "@ticketing/microservices/shared/env";
import { Patterns } from "@ticketing/microservices/shared/events";
// eslint-disable-next-line @nx/enforce-module-boundaries
import {
type RmqManagerService,
getReplyQueueName,
} from "@ticketing/microservices/shared/rmq";
import { Services } from "@ticketing/shared/constants";
import { Ticket, TicketStatus } from "@ticketing/shared/models";
import { Model, Types } from "mongoose";
import { catchError, lastValueFrom, of } from "rxjs";
import { EnvironmentVariables } from "../src/app/env";
import { TICKET_CREATED_EVENT } from "../src/app/shared/events";
import {
Ticket as TicketModel,
TicketDocument,
} from "../src/app/tickets/schemas";
import { TicketsModule } from "../src/app/tickets/tickets.module";
describe("TicketsMSController (e2e)", () => {
const envFilePath = "apps/moderation/.env.test";
const envVariables = loadEnv(envFilePath, true);
const moderationQueue = `${Services.MODERATION_SERVICE}_QUEUE_TEST`;
const moderationReplyQueue = `${getReplyQueueName(
Services.MODERATION_SERVICE,
Services.TICKETS_SERVICE,
)}_TEST`;
let app: NestFastifyApplication;
let microservice: INestMicroservice;
let ticketRmqPublisher: AmqpClient;
let ticketModel: Model<TicketDocument>;
let eventEmitter: EventEmitter2;
let rmqManager: RmqManagerService;
beforeAll(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [
ConfigModule.forRoot({
isGlobal: true,
expandVariables: true,
envFilePath,
validate: validate(EnvironmentVariables),
load: [() => envVariables],
}),
TicketsModule,
MongooseModule.forRoot(envVariables["MONGODB_URI"]),
EventEmitterModule.forRoot({
wildcard: true,
delimiter: "/",
}),
],
}).compile();
app = moduleFixture.createNestApplication(new FastifyAdapter());
const configService = app.get(ConfigService);
const rmqUrl = configService.get("RMQ_URL") as string;
ticketRmqPublisher = new AmqpClient({
urls: [rmqUrl],
persistent: false,
noAck: true,
queue: moderationQueue,
replyQueue: moderationReplyQueue,
queueOptions: {
durable: false,
exclusive: false,
autoDelete: false,
},
replyQueueOptions: {
durable: false,
exclusive: true,
autoDelete: false,
},
socketOptions: {
keepAlive: true,
heartbeatIntervalInSeconds: 30,
reconnectTimeInSeconds: 1,
},
});
const options: CustomStrategy = {
strategy: new AmqpServer({
urls: [rmqUrl],
persistent: false,
noAck: false,
queue: moderationQueue,
queueOptions: {
durable: false,
exclusive: false,
autoDelete: false,
},
socketOptions: {
keepAlive: true,
heartbeatIntervalInSeconds: 30,
reconnectTimeInSeconds: 1,
},
}),
};
microservice = moduleFixture.createNestMicroservice(options);
ticketModel = app.get(getModelToken(TicketModel.name));
eventEmitter = app.get(EventEmitter2);
const lazyModuleLoader = app.get(LazyModuleLoader);
const { RmqManagerModule, RmqManagerService } = await import(
"@ticketing/microservices/shared/rmq"
);
const moduleRef = await lazyModuleLoader.load(() =>
RmqManagerModule.forRoot({
apiUrl: configService.get("RMQ_MANAGEMENT_API_URL"),
username: "guest",
password: "guest",
}),
);
rmqManager = moduleRef.get(RmqManagerService);
await microservice.listen();
await app.init();
});
afterAll(async () => {
ticketRmqPublisher?.close();
await microservice?.close();
await app?.close();
});
describe("should be defined", () => {
it("should be defined", () => {
expect(app).toBeDefined();
expect(microservice).toBeDefined();
expect(ticketRmqPublisher).toBeDefined();
expect(ticketModel).toBeDefined();
});
});
describe("ticket.created", () => {
it("should NOT create ticket when event data is invalid", async () => {
const ticket = {
id: new Types.ObjectId().toHexString(),
title: 3000,
price: "not a price",
version: "invalid version",
};
//
const response = await lastValueFrom(
ticketRmqPublisher
.send(Patterns.TicketCreated, ticket)
.pipe(catchError((err) => of(err))),
);
//
expect(response).toHaveProperty("name", "AcceptableError");
expect(response).toHaveProperty("statusCode", 400);
expect(response).toHaveProperty("path", Patterns.TicketCreated);
expect(response).toHaveProperty("details");
expect(response).toHaveProperty("errors");
const data = await rmqManager.getMessages(moderationQueue);
expect(data).toHaveLength(0);
const createdTicket = await ticketModel.findOne({ _id: ticket.id });
expect(createdTicket).toBeNull();
});
it("should create ticket when event data is valid", async () => {
const ticket: Ticket = {
id: new Types.ObjectId().toHexString(),
title: "valid title",
price: 100,
version: 0,
status: TicketStatus.WaitingModeration,
userId: new Types.ObjectId().toHexString(),
};
let eventEmitted = false;
//
eventEmitter.once(TICKET_CREATED_EVENT, () => {
eventEmitted = true;
});
const response = await lastValueFrom(
ticketRmqPublisher.send(Patterns.TicketCreated, ticket),
);
//
expect(response).toHaveProperty("ok", true);
expect(eventEmitted).toBe(true);
const data = await rmqManager.getMessages(moderationQueue);
expect(data).toHaveLength(0);
const createdTicket = await ticketModel.findOne({ _id: ticket.id });
expect(createdTicket?._id?.toString()).toEqual(ticket.id);
});
});
});
Note:
- It is a NestJS convention to store the end-to-end tests in the
test
directory of the application.- Don't forget to start all the docker containers before running the end-to-end tests.
- Once more we will use the
LazyModuleLoader
to load theRmqManagerModule
andRmqManagerService
, which are used to manage the RabbitMQ queues and messages.
This test suite will focus on reproducing interactions through the moderation
HTTP API. We will use the underlying inject
method of the fastify
framework to send fake HTTP requests to the ModerationsController
and assert the responses.
The setup for this test suite is a bit more involved than the previous one.
- We will use the
overrideProvider
andoverrideModule
methods to turn off the tasks running periodically in the background. Similar methods exist for guards (overrideGuard
), interceptors (overrideInterceptor
), filters (overrideFilter
), and pipes (overridePipe
). - We will create several helper functions to register and authenticate users on Ory Kratos, create moderation in MongoDB, and declare all the required relations in Ory Keto.
Note: When using the Express adapter, it is more common to use
supertest
as shown in the NestJS docs
The tests will verify the following scenarios:
- When a request with an invalid parameter hits the
GET /moderations
endpoint, themoderations
application should reply with a400 Bad Request
error. - When an unauthenticated user tries to call
GET /moderations/:id
, themoderations
application should reply with a401 Unauthorized
error. - When an unauthorized user tries to call
GET /moderations/:id
, themoderations
application should reply with a403 Forbidden
error. - When calling
GET /moderations/:id
with valid parameters and the moderation exists in the database, themoderations
application should reply with a200 OK
response.
Solution
// apps/moderation/test/moderations.controller.e2e-spec.ts
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable max-lines-per-function */
import { OryRelationshipsService } from "@getlarge/keto-client-wrapper";
import {
createRelationQuery,
relationTupleBuilder,
} from "@getlarge/keto-relations-parser";
import {
OryFrontendService,
OryIdentitiesModule,
OryIdentitiesService,
} from "@getlarge/kratos-client-wrapper";
import { createMock } from "@golevelup/ts-jest";
import { DynamicModule } from "@nestjs/common";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { EventEmitterModule } from "@nestjs/event-emitter";
import { getModelToken, MongooseModule } from "@nestjs/mongoose";
import {
FastifyAdapter,
NestFastifyApplication,
} from "@nestjs/platform-fastify";
import { ScheduleModule } from "@nestjs/schedule";
import { Test, TestingModule } from "@nestjs/testing";
import { Identity } from "@ory/client";
import { AsyncLocalStorageModule } from "@ticketing/microservices/shared/async-local-storage";
import { loadEnv, validate } from "@ticketing/microservices/shared/env";
import { PermissionNamespaces } from "@ticketing/microservices/shared/models";
import { Resources } from "@ticketing/shared/constants";
import { Moderation, ModerationStatus } from "@ticketing/shared/models";
import { randomBytes } from "crypto";
import { Model, Types } from "mongoose";
import { type AppConfigService, EnvironmentVariables } from "../src/app/env";
import { ModerationsModule } from "../src/app/moderations/moderations.module";
import { ModerationsTasks } from "../src/app/moderations/moderations.tasks";
import {
Moderation as ModerationSchema,
ModerationDocument,
} from "../src/app/moderations/schemas";
class DummyScheduleModule {
static forRoot(): DynamicModule {
return {
global: true,
module: DummyScheduleModule,
providers: [],
exports: [],
};
}
}
const login = async (
{
oryFrontendService,
}: {
oryFrontendService: OryFrontendService;
},
{ email, password }: { email: string; password: string },
): Promise<string> => {
const { data: loginFlow } = await oryFrontendService.createNativeLoginFlow();
const { data } = await oryFrontendService.updateLoginFlow({
flow: loginFlow.id,
updateLoginFlowBody: {
password,
identifier: email,
method: "password",
},
});
return data.session_token as string;
};
const createOryUser = async (
{
oryFrontendService,
oryIdentityService,
}: {
oryFrontendService: OryFrontendService;
oryIdentityService: OryIdentitiesService;
},
{
email,
password,
userId,
}: { email: string; password: string; userId: string },
): Promise<{ identity: Identity; sessionToken: string }> => {
const { data: registrationFlow } =
await oryFrontendService.createNativeRegistrationFlow();
const { data } = await oryFrontendService.updateRegistrationFlow({
flow: registrationFlow.id,
updateRegistrationFlowBody: {
traits: { email },
password,
method: "password",
},
});
const { identity } = data;
await oryIdentityService.updateIdentity({
id: identity.id,
updateIdentityBody: {
metadata_public: { id: userId },
schema_id: identity.schema_id,
traits: identity.traits,
state: identity.state,
},
});
const sessionToken = await login({ oryFrontendService }, { email, password });
return { identity, sessionToken };
};
const createOryAdminRelation = async (
{
oryRelationshipsService,
}: {
oryRelationshipsService: OryRelationshipsService;
},
{ userId }: { userId: string },
): Promise<void> => {
const relationTuple = relationTupleBuilder()
.subject(PermissionNamespaces[Resources.USERS], userId)
.isIn("members")
.of(PermissionNamespaces[Resources.GROUPS], "admin")
.toJSON();
await oryRelationshipsService.createRelationship({
createRelationshipBody: createRelationQuery(relationTuple).unwrapOrThrow(),
});
};
const createOryModerationRelation = async (
{
oryRelationshipsService,
}: {
oryRelationshipsService: OryRelationshipsService;
},
{ moderationId }: { moderationId: string },
): Promise<void> => {
const relationTuple = relationTupleBuilder()
.subject(PermissionNamespaces[Resources.GROUPS], "admin", "members")
.isIn("editors")
.of(PermissionNamespaces[Resources.MODERATIONS], moderationId);
await oryRelationshipsService.createRelationship({
createRelationshipBody: createRelationQuery(relationTuple).unwrapOrThrow(),
});
};
const createModeration = async ({
moderationModel,
oryRelationshipsService,
}: {
moderationModel: Model<ModerationDocument>;
oryRelationshipsService: OryRelationshipsService;
}): Promise<Moderation> => {
const doc = await moderationModel.create({
ticket: new Types.ObjectId(),
status: ModerationStatus.Pending,
});
const moderation = doc.toJSON<Moderation>();
await createOryModerationRelation(
{ oryRelationshipsService },
{ moderationId: moderation.id },
);
return moderation;
};
describe("ModerationsController (e2e)", () => {
let app: NestFastifyApplication;
const envFilePath = "apps/moderation/.env.test";
const envVariables = loadEnv(envFilePath, true);
let oryFrontendService: OryFrontendService;
let oryIdentityService: OryIdentitiesService;
let oryRelationshipsService: OryRelationshipsService;
let moderationModel: Model<ModerationDocument>;
const validUserId = new Types.ObjectId().toHexString();
const invalidUserId = new Types.ObjectId().toHexString();
let validUserCredentials: { identity: Identity; sessionToken: string };
let invalidUserCredentials: { identity: Identity; sessionToken: string };
let exampleModeration: Moderation;
beforeAll(async () => {
const moduleFixture: TestingModule = await Test.createTestingModule({
imports: [
ConfigModule.forRoot({
isGlobal: true,
expandVariables: true,
envFilePath,
validate: validate(EnvironmentVariables),
load: [() => envVariables],
}),
ModerationsModule,
MongooseModule.forRoot(envVariables["MONGODB_URI"]),
EventEmitterModule.forRoot({
wildcard: true,
delimiter: "/",
}),
AsyncLocalStorageModule.forRoot(),
OryIdentitiesModule.forRootAsync({
inject: [ConfigService],
useFactory: (configService: AppConfigService) => ({
basePath: configService.get("ORY_KRATOS_ADMIN_URL"),
accessToken: configService.get("ORY_KRATOS_API_KEY"),
}),
}),
],
})
.overrideModule(ScheduleModule)
.useModule(DummyScheduleModule)
.overrideProvider(ModerationsTasks)
.useValue(createMock<ModerationsTasks>())
.compile();
app = moduleFixture.createNestApplication(new FastifyAdapter());
moderationModel = app.get(getModelToken(ModerationSchema.name));
oryFrontendService = app.get(OryFrontendService);
oryIdentityService = app.get(OryIdentitiesService);
oryRelationshipsService = app.get(OryRelationshipsService);
validUserCredentials = await createOryUser(
{ oryFrontendService, oryIdentityService },
{
email: `${randomBytes(8).toString("hex")}@example.com`,
password: randomBytes(8).toString("hex"),
userId: validUserId,
},
);
await createOryAdminRelation(
{ oryRelationshipsService },
{ userId: validUserId },
);
invalidUserCredentials = await createOryUser(
{ oryFrontendService, oryIdentityService },
{
email: `${randomBytes(8).toString("hex")}@example.com`,
password: randomBytes(8).toString("hex"),
userId: invalidUserId,
},
);
exampleModeration = await createModeration({
moderationModel,
oryRelationshipsService,
});
await app.init();
});
afterAll(async () => {
await app?.close();
validUserCredentials?.identity.id &&
(await oryIdentityService?.deleteIdentity({
id: validUserCredentials?.identity.id,
}));
invalidUserCredentials?.identity.id &&
(await oryIdentityService?.deleteIdentity({
id: invalidUserCredentials?.identity.id,
}));
});
describe("GET /moderations", () => {
it("should return 401 when not authenticated", async () => {
const response = await app.inject({
method: "GET",
url: "/moderations",
});
expect(response.statusCode).toBe(401);
});
it("should return 403 when not authorized", async () => {
const response = await app.inject({
method: "GET",
url: "/moderations",
headers: {
Authorization: `Bearer ${invalidUserCredentials.sessionToken}`,
},
});
expect(response.statusCode).toBe(403);
});
it("should return 200 when authorized", async () => {
const response = await app.inject({
method: "GET",
url: "/moderations",
headers: {
Authorization: `Bearer ${validUserCredentials.sessionToken}`,
},
});
expect(response.statusCode).toBe(200);
});
});
describe("GET /moderations/:id", () => {
it("should return 401 when not authenticated", async () => {
const response = await app.inject({
method: "GET",
url: `/moderations/${exampleModeration.id}`,
});
expect(response.statusCode).toBe(401);
});
it("should return 403 when not authorized", async () => {
const response = await app.inject({
method: "GET",
url: `/moderations/${exampleModeration.id}`,
headers: {
Authorization: `Bearer ${invalidUserCredentials.sessionToken}`,
},
});
expect(response.statusCode).toBe(403);
});
it("should return 200 when authorized", async () => {
const response = await app.inject({
method: "GET",
url: `/moderations/${exampleModeration.id}`,
headers: {
Authorization: `Bearer ${validUserCredentials.sessionToken}`,
},
});
expect(response.statusCode).toBe(200);
});
it("should return 200 using cached value", async () => {
await moderationModel.deleteOne({ _id: exampleModeration.id });
//
const response = await app.inject({
method: "GET",
url: `/moderations/${exampleModeration.id}`,
headers: {
Authorization: `Bearer ${validUserCredentials.sessionToken}`,
},
});
expect(response.statusCode).toBe(200);
});
});
});
Note: Before running the end-to-end tests, make sure to start all the docker containers with
yarn docker:deps:up
andyarn docker:ory:up
.
Let's project into the future and imagine that our moderation app is a huge success. We have a lot of moderation tasks to handle, and we need to scale our moderation app to handle the load, ensure continuous service, and reduce latency. One way to achieve this is by using clustering.
-
Performance Improvements: Node.js, by default, runs on a single thread. However, clustering can spawn a child process for every CPU core, making better use of multicore systems. This ensures your application can handle more load and provides better performance.
-
Fault Tolerance: Clustering increases fault tolerance. If a worker dies, the primary process can spawn a new worker, ensuring continuous service and minimizing downtime.
-
Load Balancing: The cluster module allows you to distribute the workload among different worker processes. It can also share server ports among these processes.
-
Better Resource Utilization: Clustering leads to better resource utilization, as it uses all available cores and not just one. It provides enhanced throughput and efficiency.
-
Scalability: Node.js clustering lets you quickly scale your application to meet increased traffic demand.
Note: You can find a simple benchmark for a NestJS app using clustering here
-
State Sharing: Each worker process is a separate Node.js instance with its memory space in a clustered environment. They do not share a state. You'll need a way to share the state, such as a database, Redis, or other methods. Generally speaking, the application should follow the 12-factor app principles
-
Complexity: Managing separate worker processes adds complexity to your code. Also, maintaining the synchronization between these worker processes requires careful attention and programming.
-
Shared Resources: If your application uses files or other system resources, you may encounter problems, as direct file access is unsafe in a clustered environment.
-
High Memory Usage: Depending on your application, using the cluster module might increase your application's memory usage.
-
Debugging Difficulty: Debugging a clustered environment can be more difficult, as it may not be immediately apparent which worker process is causing problems.
-
Not a Silver Bullet: While clustering can help improve your application's performance, it’s not always the best solution. For CPU-bound tasks, offloading the work to a different service or using a worker threads module may be a more prudent choice.
Note: Some concrete pitfalls we could encounter in our ticketing apps are the following:
- AMQP reply queue names must be unique per worker (use random suffix in queue name)
- Tasks managed by a NestJS scheduler should (generally) be handled by a single worker (see Scheduler section for an example using Redis lock)
In this exercise, we will learn how to use clustering in a Node.js application and apply it to our moderation app.
The goal is that we should be able to run the moderation app in cluster mode by setting the CLUSTER_MODE
environment variable to true
and optionally define the maximum number of workers using MAX_WORKERS
.
I wrote a module (ClusterService
) to help you with this exercise, which you can find here. The ClusterService
class is a wrapper around the native Node.js cluster
module.
While it provides sane default options and a simple API to manage the cluster, it is also highly customizable.
yarn add @getlarge/nestjs-tools-cluster
Solution
// apps/moderation/src/main.ts
import { AmqpOptions, AmqpServer } from "@getlarge/nestjs-tools-amqp-transport";
import {
ClusterService,
ClusterServiceConfig,
} from "@getlarge/nestjs-tools-cluster";
import { LockService } from "@getlarge/nestjs-tools-lock";
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { LazyModuleLoader, NestFactory } from "@nestjs/core";
import { CustomStrategy } from "@nestjs/microservices";
import {
FastifyAdapter,
NestFastifyApplication,
} from "@nestjs/platform-fastify";
import { GLOBAL_API_PREFIX } from "@ticketing/microservices/shared/constants";
import { Services } from "@ticketing/shared/constants";
import { AppModule } from "./app/app.module";
import { type AppConfigService } from "./app/env";
import { GlobalFilter } from "./app/filters/global.filter";
import { GlobalGuard } from "./app/guards/global.guard";
import { GlobalInterceptor } from "./app/interceptors/global.interceptor";
import { globalMiddleware } from "./app/middlewares/global.middleware";
import { GlobalPipe } from "./app/pipes/global.pipe";
const DEFAULT_PORT = 3090;
const CLUSTER_MODE = process.env.CLUSTER_MODE === "true";
const MAX_WORKERS = +process.env.MAX_WORKERS || 2;
async function createRmqPolicy(app: NestFastifyApplication): Promise<void> {
const lazyModuleLoader = app.get(LazyModuleLoader);
const configService = app.get<AppConfigService>(ConfigService);
const { RmqManagerModule, RmqManagerService } = await import(
"@ticketing/microservices/shared/rmq"
);
const moduleRef = await lazyModuleLoader.load(() =>
RmqManagerModule.forRoot({
apiUrl: configService.get("RMQ_MANAGEMENT_API_URL"),
username: "guest",
password: "guest",
}),
);
const rmqManager = moduleRef.get(RmqManagerService);
const pattern = "MODERATION_SERVICE$";
const deadLetterExchange = "MODERATION_SERVICE_DEAD_LETTER_EXCHANGE";
const definition = {
"message-ttl": 30000,
"max-length": 1000,
"dead-letter-exchange": deadLetterExchange,
};
const policyName = "MODERATION_SERVICE_DLX_POLICY";
const vhost = "/";
await rmqManager.setPolicy(
policyName,
{
pattern,
definition,
},
vhost,
);
await rmqManager.setExchange(
deadLetterExchange,
{ autoDelete: false, durable: true },
"topic",
vhost,
);
}
// eslint-disable-next-line max-lines-per-function
async function bootstrap(
opts: { workerId?: number } = {},
disconnect: () => void = () => process.exit(1),
): Promise<void> {
/**
* This is a global variable that will be used to identify the worker id
* in the application. This is useful for debugging purposes.
*/
globalThis.__WORKER_ID__ = opts.workerId;
try {
const app = await NestFactory.create<NestFastifyApplication>(
AppModule,
new FastifyAdapter({
trustProxy: true,
bodyLimit: +process.env.MAX_PAYLOAD_SIZE || 1048576,
}),
{ bufferLogs: true, abortOnError: false },
);
app.setGlobalPrefix(GLOBAL_API_PREFIX);
app.enableShutdownHooks();
app.use(globalMiddleware);
app.useGlobalGuards(new GlobalGuard());
app.useGlobalInterceptors(new GlobalInterceptor());
app.useGlobalPipes(new GlobalPipe());
app.useGlobalFilters(new GlobalFilter());
const configService = app.get<AppConfigService>(ConfigService);
const port = configService.get("PORT", { infer: true }) ?? DEFAULT_PORT;
const amqpOptions: AmqpOptions = {
urls: [configService.get("RMQ_URL") as string],
persistent: true,
noAck: false,
prefetchCount: configService.get("RMQ_PREFETCH_COUNT"),
isGlobalPrefetchCount: false,
queue: `${Services.MODERATION_SERVICE}_QUEUE`,
queueOptions: {
durable: true,
exclusive: false,
autoDelete: false,
},
socketOptions: {
keepAlive: true,
heartbeatIntervalInSeconds: 30,
reconnectTimeInSeconds: 1,
},
};
const options: CustomStrategy = {
strategy: new AmqpServer(amqpOptions),
};
const microService = app.connectMicroservice(options);
// required to initialize modules (include LockModule) before opening listeners
await app.init();
// only one worker should create the policy
const lock = await app
.get(LockService)
.lock("createRmqPolicy", 5000)
.catch(() => null);
if (lock) {
try {
await createRmqPolicy(app);
} catch (error) {
Logger.error(error);
}
}
await microService.listen();
await app.listen(port, "0.0.0.0", () => {
Logger.log(`Listening at http://localhost:${port}/${GLOBAL_API_PREFIX}`);
});
} catch (error) {
Logger.error(error);
disconnect();
}
}
if (CLUSTER_MODE) {
const clusterConfig: ClusterServiceConfig = {
workers: MAX_WORKERS,
delay: 2000,
grace: 1000,
};
const clusterService = new ClusterService(clusterConfig);
clusterService.clusterize(bootstrap).catch((e) => {
clusterService.logger.error(e);
process.exit(1);
});
} else {
void bootstrap({}, () => {
process.exit(1);
});
}
Note: The
ClusterService
can be configured with the following options:
logger
: The logger instance. Default isconsole
.restartOnExit
: Whether to restart the worker process when it exits. Default isfalse
.showLogs
: Whether to display logs. Default isfalse
.workers
: The number of workers to spawn. Default is the number of CPU cores.delay
: The delay in milliseconds between each worker spawn. Default is0
.lifetime
: The maximum lifetime of a worker in milliseconds. Default isInfinity
.grace
: The grace period in milliseconds to wait for a worker to exit. Default is5000
.signals
: The signals to listen for. Default is['SIGTERM', 'SIGINT']
.
Why do we need API versioning for our HTTP API?
- To maintain backward compatibility
- To introduce new features
- To deprecate old features
- To support a stable API
Luckily, NestJS provides several strategies for API versioning:
- URL versioning
- Header versioning
- Media type versioning
You can set the version requirement at the application, controller, or route level.
NestJS extracts the version from the URL, query string, header, or media type during the request handling and dispatches the request to the appropriate controller or route. If the version defined in the request does not match the version required by the application, the application will reply with a 404 Not Found response.
// ...
const app = await NestFactory.create<NestFastifyApplication>(
AppModule,
new FastifyAdapter()
);
// ...
// to enable URL versioning (e.g., /v1/users)
app.enableVersioning({
type: VersioningType.URI,
});
// to use a custom versioning header ( e.g., Custom-Header: 1)
app.enableVersioning({
type: VersioningType.HEADER,
header: 'Custom-Header',
});
// to use a custom media type (e.g., Accept header => application/json;v=2)
app.enableVersioning({
type: VersioningType.MEDIA_TYPE,
key: 'v=',
});
// to set version requirements at the application level
app.enableVersioning({
defaultVersion: '1'
// or
defaultVersion: ['1', '2']
// or
defaultVersion: VERSION_NEUTRAL
});
// ...
import { Controller, Get, Version, VERSION_NEUTRAL } from "@nestjs/common";
// ...
// Only requests with the version '1' will be dispatched to this controller
@Controller({ version: "1" })
export class ModerationControllerV1 {
// ...
}
// Requests with the versions 1 or 2 will be dispatched to this controller
@Controller({ version: ["1", "2"] })
export class ModerationController {
// ...
}
// no version will be required in the request to access this controller
@Controller({
version: VERSION_NEUTRAL,
})
export class HealthController {
// ...
}
// Only requests with the version '1' will be dispatched to this route
@Controller("users")
export class UsersController {
// ...
@Version("1")
@Get()
findAll() {
// ...
}
}
Note: Check the official documentation for more information.
Let's imagine we want to enrich our ticketing app by allowing users to upload an image when creating a ticket.
Dealing with File Upload in NestJS is straightforward, as the official documentation shows. However, when it comes to dealing with files when using Fastify adapter, it's a bit different.
To solve our pain, one potentially good candidate developed by one of the NestJS core team members is @nest-lab/fastify-multer. But it depends on fastify-multer, which is abandoned and not maintained anymore.
Another one is nest-file-fastify, which depends on fastify-multipart, and the Fastify team maintains it. But nest-file-fastify
is outdated and unmaintained.
I went on making a fork of nest-file-fastify
and updating the fastify-multipart
dependency to the latest version. Then, use it in our project. You can find it under @getlarge/nestjs-tools-fastify-upload.
Now that we can handle file upload in our application, we can store the files. I suggest keeping the files locally in development mode and using a cloud storage service when the application is deployed.
I created a NestJS module to handle file storage, which you can find here. It follows the Strategy design pattern and provides a simple API to store and retrieve files from different storage providers. Currently, it supports the local file system and AWS S3. You can define the strategy in the module configuration to instantiate the right storage provider.
First, install the package and its peer dependencies
yarn add @getlarge/nestjs-tools-fastify-upload @getlarge/nestjs-tools-file-storage @aws-sdk/client-s3 @aws-sdk/lib-storage
Import and register @fastify/multipart
in the main.ts
file
Solution
// apps/tickets/src/main.ts
import "./vault";
import "reflect-metadata";
import fastifyCors from "@fastify/cors";
import { fastifyHelmet } from "@fastify/helmet";
import fastifyMultipart from "@fastify/multipart";
import { AmqpOptions, AmqpServer } from "@getlarge/nestjs-tools-amqp-transport";
import { ConfigService } from "@nestjs/config";
import { NestFactory } from "@nestjs/core";
import { CustomStrategy } from "@nestjs/microservices";
import {
FastifyAdapter,
NestFastifyApplication,
} from "@nestjs/platform-fastify";
import {
DocumentBuilder,
SwaggerCustomOptions,
SwaggerModule,
} from "@nestjs/swagger";
import {
bearerSecurityScheme,
GLOBAL_API_PREFIX,
SecurityRequirements,
sessionSecurityScheme,
} from "@ticketing/microservices/shared/constants";
import { Resources, Services } from "@ticketing/shared/constants";
import { Logger } from "nestjs-pino";
import { existsSync, writeFileSync } from "node:fs";
import { resolve } from "node:path";
import { AppModule } from "./app/app.module";
import { AppConfigService } from "./app/env";
import { APP_FOLDER, DEFAULT_PORT } from "./app/shared/constants";
// eslint-disable-next-line max-lines-per-function
async function bootstrap(): Promise<void> {
const app = await NestFactory.create<NestFastifyApplication>(
AppModule,
new FastifyAdapter({
trustProxy: true,
bodyLimit: 1048576,
}),
{ bufferLogs: true, abortOnError: false },
);
const configService = app.get<AppConfigService>(ConfigService);
const port = configService.get("PORT", DEFAULT_PORT, { infer: true });
const environment = configService.get("NODE_ENV", { infer: true });
const swaggerUiPrefix = configService.get("SWAGGER_PATH", { infer: true });
const proxyServerUrls = configService.get("PROXY_SERVER_URLS", {
infer: true,
});
const logger = app.get(Logger);
app.useLogger(logger);
app.setGlobalPrefix(GLOBAL_API_PREFIX);
// Fastify
await app.register(fastifyHelmet, {
contentSecurityPolicy: {
directives: {
defaultSrc: [`'self'`],
styleSrc: [`'self'`, `'unsafe-inline'`],
imgSrc: [`'self'`, "data:", "validator.swagger.io"],
scriptSrc: [`'self'`, `https: 'unsafe-inline'`],
},
},
});
await app.register(fastifyMultipart);
if (!proxyServerUrls.length && environment === "development") {
await app.register(fastifyCors, {
origin: (origin, cb) => {
const hostname = new URL(origin).hostname;
if (hostname === "localhost" || hostname === "127.0.0.1") {
cb(null, true);
return;
}
cb(new Error("Not allowed"), false);
},
credentials: true,
// allowedHeaders: ALLOWED_HEADERS,
// exposedHeaders: EXPOSED_HEADERS,
allowedHeaders: "*",
exposedHeaders: "*",
});
}
const amqpOptions: AmqpOptions = {
urls: [configService.get("RMQ_URL") as string],
persistent: true,
noAck: false,
prefetchCount: configService.get("RMQ_PREFETCH_COUNT"),
isGlobalPrefetchCount: false,
queue: `${Services.TICKETS_SERVICE}_QUEUE`,
queueOptions: {
durable: true,
exclusive: false,
autoDelete: false,
},
socketOptions: {
keepAlive: true,
heartbeatIntervalInSeconds: 30,
reconnectTimeInSeconds: 1,
},
};
const options: CustomStrategy = {
strategy: new AmqpServer(amqpOptions),
};
const microService = app.connectMicroservice(options);
// Swagger UI
const documentBuilder = new DocumentBuilder()
.setTitle("Tickets API")
.setDescription("Ticketing tickets API description")
.setVersion(configService.get("APP_VERSION"))
.addSecurity(SecurityRequirements.Session, sessionSecurityScheme)
.addSecurity(SecurityRequirements.Bearer, bearerSecurityScheme)
.addSecurityRequirements(SecurityRequirements.Session)
.addSecurityRequirements(SecurityRequirements.Bearer)
.addServer(configService.get("SERVER_URL"))
.addTag(Resources.TICKETS);
if (proxyServerUrls.length) {
for (const serverUrl of proxyServerUrls) {
documentBuilder.addServer(serverUrl);
}
}
const document = SwaggerModule.createDocument(app, documentBuilder.build());
const customOptions: SwaggerCustomOptions = {
swaggerOptions: {
persistAuthorization: true,
},
};
SwaggerModule.setup(swaggerUiPrefix, app, document, customOptions);
// Save OpenAPI specs
const openApiPath = resolve(APP_FOLDER, "openapi.json");
// eslint-disable-next-line security/detect-non-literal-fs-filename
if (existsSync(APP_FOLDER)) {
// eslint-disable-next-line security/detect-non-literal-fs-filename
writeFileSync(openApiPath, JSON.stringify(document, null, 2));
}
// Init
await microService.listen();
await app.listen(port, "0.0.0.0", () => {
logger.log(`Listening at http://localhost:${port}/${GLOBAL_API_PREFIX}`);
logger.log(
`Access SwaggerUI at http://localhost:${port}/${swaggerUiPrefix}`,
);
});
}
bootstrap().catch((error) => {
console.error(error);
process.exit(1);
});
Add the FileStorageModule
to the TicketsModule
and configure it to use the local file system in development mode and AWS S3 in production mode.
Solution
// apps/tickets/src/app/tickets/tickets.module.ts
import { OryOAuth2Module } from "@getlarge/hydra-client-wrapper";
import {
OryPermissionsModule,
OryRelationshipsModule,
} from "@getlarge/keto-client-wrapper";
import { OryFrontendModule } from "@getlarge/kratos-client-wrapper";
import { AmqpClient, AmqpOptions } from "@getlarge/nestjs-tools-amqp-transport";
import {
FileStorageLocal,
FileStorageLocalSetup,
FileStorageModule,
FileStorageS3,
FileStorageS3Setup,
MethodTypes,
} from "@getlarge/nestjs-tools-file-storage";
import { Module } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { APP_FILTER } from "@nestjs/core";
import {
ClientsModule,
CustomClientOptions,
Transport,
} from "@nestjs/microservices";
import { MongooseModule } from "@nestjs/mongoose";
import { GlobalErrorFilter } from "@ticketing/microservices/shared/filters";
import {
OryAuthenticationGuard,
OryOAuth2AuthenticationGuard,
} from "@ticketing/microservices/shared/guards";
import { getReplyQueueName } from "@ticketing/microservices/shared/rmq";
import { Environment, Services } from "@ticketing/shared/constants";
import { updateIfCurrentPlugin } from "mongoose-update-if-current";
import { mkdir, readdir } from "node:fs/promises";
import path from "node:path";
import { AppConfigService, EnvironmentVariables } from "../env";
import {
MODERATIONS_CLIENT,
ORDERS_CLIENT,
ORY_AUTH_GUARD,
ORY_OAUTH2_GUARD,
} from "../shared/constants";
import { Ticket, TicketSchema } from "./schemas/ticket.schema";
import { TicketsController } from "./tickets.controller";
import { TicketsService } from "./tickets.service";
import { TicketsMSController } from "./tickets-ms.controller";
const MongooseFeatures = MongooseModule.forFeatureAsync([
{
name: Ticket.name,
useFactory: () => {
const schema = TicketSchema;
schema.plugin(updateIfCurrentPlugin);
return schema;
},
inject: [ConfigService],
},
]);
const clientFactory = (
configService: AppConfigService,
consumerService: Services,
): CustomClientOptions => {
const options: AmqpOptions = {
urls: [configService.get("RMQ_URL") as string],
persistent: true,
noAck: true,
prefetchCount: configService.get("RMQ_PREFETCH_COUNT"),
isGlobalPrefetchCount: false,
queue: `${consumerService}_QUEUE`,
replyQueue: getReplyQueueName(consumerService, Services.TICKETS_SERVICE),
queueOptions: {
durable: true,
exclusive: false,
autoDelete: false,
},
socketOptions: {
keepAlive: true,
heartbeatIntervalInSeconds: 30,
reconnectTimeInSeconds: 1,
},
};
return {
customClass: AmqpClient,
options,
};
};
const Clients = ClientsModule.registerAsync([
{
name: ORDERS_CLIENT,
inject: [ConfigService],
useFactory: (configService: AppConfigService) => {
const clientOptions = clientFactory(
configService,
Services.ORDERS_SERVICE,
);
return { ...clientOptions, transport: Transport.RMQ };
},
},
{
name: MODERATIONS_CLIENT,
inject: [ConfigService],
useFactory: (configService: AppConfigService) => {
const clientOptions = clientFactory(
configService,
Services.MODERATION_SERVICE,
);
return { ...clientOptions, transport: Transport.RMQ };
},
},
]);
const sanitizePath = (fileName: string, root: string): string => {
if (fileName.indexOf("\0") !== -1) {
throw new Error("Invalid path");
}
const safeInput = path.normalize(fileName).replace(/^(\.\.(\/|\\|$))+/, "");
const absoluteFilepath = path.join(root, safeInput);
if (absoluteFilepath.indexOf(root) !== 0) {
throw new Error("Invalid path");
}
return absoluteFilepath;
};
@Module({
imports: [
MongooseFeatures,
Clients,
FileStorageModule.forRootAsync({
inject: [ConfigService],
useFactory: (configService: AppConfigService) => {
const environment = configService.get("NODE_ENV", { infer: true });
if (environment === Environment.Development) {
const setup: FileStorageLocalSetup = {
storagePath: configService.get("STORAGE_PATH"),
maxPayloadSize: configService.get("MAX_PAYLOAD_SIZE"),
};
const filePath = async (options: {
_req?: Request;
fileName: string;
methodType: MethodTypes;
}): Promise<string> => {
const { fileName, methodType } = options;
const root = path.resolve(setup.storagePath);
const absoluteFilepath = sanitizePath(fileName, root);
if (methodType !== MethodTypes.WRITE) {
return absoluteFilepath;
}
const { dir } = path.parse(absoluteFilepath);
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
await readdir(dir);
} catch (error) {
if (error.code === "ENOENT") {
// eslint-disable-next-line security/detect-non-literal-fs-filename
await mkdir(dir, { recursive: true });
} else {
throw error;
}
}
return absoluteFilepath;
};
return new FileStorageLocal(setup, () => {
return {
filePath,
limits: { fileSize: setup.maxPayloadSize * 1024 * 1024 },
};
});
}
const setup: FileStorageS3Setup = {
maxPayloadSize: configService.get("MAX_PAYLOAD_SIZE"),
bucket: configService.get("AWS_S3_BUCKET"),
region: configService.get("AWS_S3_REGION"),
credentials: {
accessKeyId: configService.get("AWS_S3_ACCESS_KEY_ID"),
secretAccessKey: configService.get("AWS_S3_SECRET_ACCESS_KEY"),
},
};
return new FileStorageS3(setup);
},
}),
OryFrontendModule.forRootAsync({
inject: [ConfigService],
useFactory: (
configService: ConfigService<EnvironmentVariables, true>,
) => ({
basePath: configService.get("ORY_KRATOS_PUBLIC_URL"),
}),
}),
OryPermissionsModule.forRootAsync({
inject: [ConfigService],
useFactory: (
configService: ConfigService<EnvironmentVariables, true>,
) => ({
basePath: configService.get("ORY_KETO_PUBLIC_URL"),
}),
}),
OryRelationshipsModule.forRootAsync({
inject: [ConfigService],
useFactory: (
configService: ConfigService<EnvironmentVariables, true>,
) => ({
accessToken: configService.get("ORY_KETO_API_KEY"),
basePath: configService.get("ORY_KETO_ADMIN_URL"),
}),
}),
OryOAuth2Module.forRootAsync({
inject: [ConfigService],
useFactory: (
configService: ConfigService<EnvironmentVariables, true>,
) => ({
basePath: configService.get("ORY_HYDRA_PUBLIC_URL"),
accessToken: configService.get("ORY_HYDRA_API_KEY"),
}),
}),
],
controllers: [TicketsController, TicketsMSController],
providers: [
{
provide: APP_FILTER,
useExisting: GlobalErrorFilter,
},
GlobalErrorFilter,
TicketsService,
{
provide: ORY_AUTH_GUARD,
useClass: OryAuthenticationGuard(),
},
{
provide: ORY_OAUTH2_GUARD,
useClass: OryOAuth2AuthenticationGuard(),
},
],
exports: [MongooseFeatures, Clients, TicketsService],
})
export class TicketsModule {}
Create a new DTO to handle the file upload
Solution
// apps/tickets/src/app/tickets/models/upload-ticket-image.dto.ts
import type { StreamStorageFile } from "@getlarge/nestjs-tools-fastify-upload";
import { ApiProperty } from "@nestjs/swagger";
export class UploadTicketImageDto {
@ApiProperty({ type: "string", format: "binary" })
file: StreamStorageFile;
}
Let's add new routes to the TicketsController
to provide methods to upload the file and another to download it.
Solution
// apps/tickets/src/app/tickets/tickets.controller.ts
import { OryPermissionChecks } from "@getlarge/keto-client-wrapper";
import { relationTupleBuilder } from "@getlarge/keto-relations-parser";
import {
type StreamStorageFile,
FileInterceptor,
StreamStorage,
UploadedFile,
} from "@getlarge/nestjs-tools-fastify-upload";
import { OrGuard } from "@nest-lab/or-guard";
import {
applyDecorators,
Body,
Controller,
FileTypeValidator,
Get,
HttpStatus,
MaxFileSizeValidator,
Param,
ParseFilePipe,
Patch,
Post,
Query,
StreamableFile,
UseGuards,
UseInterceptors,
UsePipes,
ValidationPipe,
ValidationPipeOptions,
} from "@nestjs/common";
import {
ApiBearerAuth,
ApiBody,
ApiConsumes,
ApiCookieAuth,
ApiExtraModels,
ApiOperation,
ApiResponse,
ApiTags,
} from "@nestjs/swagger";
import { SecurityRequirements } from "@ticketing/microservices/shared/constants";
import {
ApiNestedQuery,
ApiPaginatedDto,
CurrentUser,
} from "@ticketing/microservices/shared/decorators";
import {
OryAuthenticationGuard,
OryAuthorizationGuard,
} from "@ticketing/microservices/shared/guards";
import {
PaginatedDto,
PaginateDto,
PaginateQuery,
PermissionNamespaces,
} from "@ticketing/microservices/shared/models";
import {
ParseObjectId,
ParseQuery,
} from "@ticketing/microservices/shared/pipes";
import {
Actions,
CURRENT_USER_KEY,
Resources,
} from "@ticketing/shared/constants";
import { requestValidationErrorFactory } from "@ticketing/shared/errors";
import { User } from "@ticketing/shared/models";
import type { FastifyRequest } from "fastify/types/request";
import { ORY_AUTH_GUARD, ORY_OAUTH2_GUARD } from "../shared/constants";
import {
CreateTicket,
CreateTicketDto,
Ticket,
TicketDto,
UpdateTicket,
UpdateTicketDto,
UploadTicketImageDto,
} from "./models";
import { TicketsService } from "./tickets.service";
const validationPipeOptions: ValidationPipeOptions = {
transform: true,
exceptionFactory: requestValidationErrorFactory,
transformOptions: { enableImplicitConversion: true },
forbidUnknownValues: true,
};
const IsTicketOwner = (): MethodDecorator =>
applyDecorators(
OryPermissionChecks((ctx) => {
const req = ctx.switchToHttp().getRequest<FastifyRequest>();
const currentUserId = req[`${CURRENT_USER_KEY}`]["id"];
const resourceId = (req.params as { id: string }).id;
return relationTupleBuilder()
.subject(PermissionNamespaces[Resources.USERS], currentUserId)
.isIn("owners")
.of(PermissionNamespaces[Resources.TICKETS], resourceId)
.toString();
}),
);
@Controller(Resources.TICKETS)
@ApiTags(Resources.TICKETS)
@ApiExtraModels(PaginatedDto)
export class TicketsController {
constructor(private readonly ticketsService: TicketsService) {}
@UseGuards(OrGuard([ORY_AUTH_GUARD, ORY_OAUTH2_GUARD]))
@UsePipes(new ValidationPipe(validationPipeOptions))
@ApiBearerAuth(SecurityRequirements.Bearer)
@ApiCookieAuth(SecurityRequirements.Session)
@ApiOperation({
description: "Request creation of a ticket",
summary: `Create a ticket - Scope : ${Resources.TICKETS}:${Actions.CREATE_ONE}`,
})
@ApiBody({ type: CreateTicketDto })
@ApiResponse({
status: HttpStatus.CREATED,
description: "Ticket created",
type: TicketDto,
})
@Post("")
create(
@Body() ticket: CreateTicket,
@CurrentUser() currentUser: User,
): Promise<Ticket> {
return this.ticketsService.create(ticket, currentUser);
}
@UsePipes(
new ValidationPipe({
exceptionFactory: requestValidationErrorFactory,
transform: true,
transformOptions: { enableImplicitConversion: true },
// forbidUnknownValues: true, //! FIX issue with query parsing process
}),
)
@ApiOperation({
description: "Filter tickets",
summary: `Find tickets - Scope : ${Resources.TICKETS}:${Actions.READ_MANY}`,
})
@ApiNestedQuery(PaginateDto)
@ApiPaginatedDto(TicketDto, "Tickets found")
@Get("")
find(
@Query(ParseQuery) paginate: PaginateQuery,
): Promise<PaginatedDto<Ticket>> {
return this.ticketsService.find(paginate);
}
@ApiOperation({
description: "Request a ticket by id",
summary: `Find a ticket - Scope : ${Resources.TICKETS}:${Actions.READ_ONE}`,
})
@ApiResponse({
status: HttpStatus.OK,
description: "Ticket found",
type: TicketDto,
})
@Get(":id")
findById(@Param("id", ParseObjectId) id: string): Promise<Ticket> {
return this.ticketsService.findById(id);
}
// TODO: check permission for ticket orderId if present
@IsTicketOwner()
@UseGuards(
OrGuard([ORY_AUTH_GUARD, ORY_OAUTH2_GUARD]),
OryAuthorizationGuard(),
)
@UsePipes(new ValidationPipe(validationPipeOptions))
@ApiBearerAuth(SecurityRequirements.Bearer)
@ApiCookieAuth(SecurityRequirements.Session)
@ApiOperation({
description: "Update a ticket by id",
summary: `Update a ticket - Scope : ${Resources.TICKETS}:${Actions.UPDATE_ONE}`,
})
@ApiBody({ type: UpdateTicketDto })
@ApiResponse({
status: HttpStatus.OK,
description: "Ticket updated",
type: TicketDto,
})
@Patch(":id")
updateById(
@Param("id", ParseObjectId) id: string,
@Body() ticket: UpdateTicket,
): Promise<Ticket> {
return this.ticketsService.updateById(id, ticket);
}
@IsTicketOwner()
@UseGuards(OryAuthenticationGuard(), OryAuthorizationGuard())
@UseInterceptors(
FileInterceptor("file", {
storage: new StreamStorage(),
}),
)
@ApiBearerAuth(SecurityRequirements.Bearer)
@ApiCookieAuth(SecurityRequirements.Session)
@ApiOperation({
description: "Upload ticket image by id",
summary: `Upload ticket image - Scope : ${Resources.TICKETS}:${Actions.UPDATE_ONE}`,
})
@ApiConsumes("multipart/form-data")
@ApiBody({ type: UploadTicketImageDto })
@ApiResponse({
status: HttpStatus.OK,
description: "Ticket",
type: TicketDto,
})
@Patch(":id/image")
uploadTicketImage(
@Param("id", ParseObjectId) id: string,
@UploadedFile(
new ParseFilePipe({
validators: [
new MaxFileSizeValidator({ maxSize: 100000 }),
new FileTypeValidator({ fileType: "image/jpeg" }),
],
}),
)
file: StreamStorageFile,
): Promise<Ticket> {
return this.ticketsService.uploadTicketImage(id, file.stream);
}
@ApiOperation({
description: "Download ticket image by id",
summary: `Download ticket image - Scope : ${Resources.TICKETS}:${Actions.READ_ONE}`,
})
@ApiResponse({
status: HttpStatus.OK,
description: "Ticket image",
content: {
"image/jpeg": {
schema: {
type: "string",
format: "binary",
},
},
},
})
@Get(":id/image")
async downloadTicketImage(
@Param("id", ParseObjectId) id: string,
): Promise<StreamableFile> {
const stream = await this.ticketsService.downloadTicketImage(id);
return new StreamableFile(stream, {
disposition: "inline",
type: "image/jpeg",
});
}
}
Note:
- The
StreamStorage
class is a custom storage provider to handle incoming file as readable stream.- The file uploaded is validated using the
ParseFilePipe
with the builtin validatorsMaxFileSizeValidator
andFileTypeValidator
to ensure the file size and type are correct. We only accept JPEG images with a maximum size of 100KB.- The
StreamableFile
class is a custom NestJS class to handle the file download with Fastify and Express.- This is optional, but it's always a good idea to document your API with Swagger. Have a look at the docs
Integrate the FileStorageService
to handle the file upload and download
Solution
// apps/tickets/src/app/tickets/tickets.service.ts
import { OryRelationshipsService } from "@getlarge/keto-client-wrapper";
import {
createRelationQuery,
relationTupleBuilder,
} from "@getlarge/keto-relations-parser";
import { FileStorageService } from "@getlarge/nestjs-tools-file-storage";
import {
BadRequestException,
Inject,
Injectable,
Logger,
NotFoundException,
} from "@nestjs/common";
import { ClientProxy } from "@nestjs/microservices";
import { InjectModel } from "@nestjs/mongoose";
import { Relationship } from "@ory/client";
import {
OrderCancelledEvent,
OrderCreatedEvent,
Patterns,
TicketCreatedEvent,
TicketUpdatedEvent,
} from "@ticketing/microservices/shared/events";
import {
NextPaginationDto,
PaginateDto,
PermissionNamespaces,
} from "@ticketing/microservices/shared/models";
import { transactionManager } from "@ticketing/microservices/shared/mongo";
import { Resources } from "@ticketing/shared/constants";
import { isErrorResponse, RetriableError } from "@ticketing/shared/errors";
import { User } from "@ticketing/shared/models";
import { Model } from "mongoose";
import { Paginator } from "nestjs-keyset-paginator";
import { Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
import {
catchError,
lastValueFrom,
retry,
throwError,
timeout,
timer,
} from "rxjs";
import { MODERATIONS_CLIENT, ORDERS_CLIENT } from "../shared/constants";
import { CreateTicket, Ticket, UpdateTicket } from "./models";
import { Ticket as TicketSchema, TicketDocument } from "./schemas";
@Injectable()
export class TicketsService {
readonly logger = new Logger(TicketsService.name);
constructor(
@InjectModel(TicketSchema.name)
private readonly ticketModel: Model<TicketDocument>,
@Inject(OryRelationshipsService)
private readonly oryRelationshipsService: OryRelationshipsService,
@Inject(ORDERS_CLIENT) private readonly ordersClient: ClientProxy,
@Inject(MODERATIONS_CLIENT) private readonly moderationClient: ClientProxy,
@Inject(FileStorageService)
private readonly fileStorageService: FileStorageService,
) {}
async create(ticket: CreateTicket, currentUser: User): Promise<Ticket> {
let createdRelation: Relationship | undefined;
try {
await using manager = await transactionManager(this.ticketModel);
const res = await manager.wrap<Ticket>(async (session) => {
const doc: CreateTicket & { userId: string } = {
...ticket,
userId: currentUser.id,
};
const docs = await this.ticketModel.create([doc], {
session,
});
const newTicket = docs[0].toJSON<Ticket>();
this.logger.debug(`Created ticket ${newTicket.id}`);
const relationTuple = relationTupleBuilder()
.subject(PermissionNamespaces[Resources.USERS], currentUser.id)
.isIn("owners")
.of(PermissionNamespaces[Resources.TICKETS], newTicket.id);
const createRelationshipBody = createRelationQuery(
relationTuple.toJSON(),
).unwrapOrThrow();
const { data } = await this.oryRelationshipsService.createRelationship({
createRelationshipBody,
});
createdRelation = data;
this.logger.debug(`Created relation ${relationTuple.toString()}`);
await lastValueFrom(
this.moderationClient
.send<
TicketCreatedEvent["name"],
TicketCreatedEvent["data"]
>(Patterns.TicketCreated, newTicket)
.pipe(
retry({
count: 5,
delay: (error: Error, retryCount: number) => {
const scalingDuration = 500;
if (
isErrorResponse(error) &&
error.name === RetriableError.name
) {
this.logger.debug(`retry attempt #${retryCount}`);
return timer(retryCount * scalingDuration);
}
throw error;
},
}),
timeout(8000),
catchError((err) => {
this.logger.error(err);
return throwError(() => err);
}),
),
);
this.logger.debug(`Sent event ${Patterns.TicketCreated}`);
return newTicket;
});
if (res.error) {
throw res.error;
}
return res.value;
} catch (error) {
if (createdRelation) {
await this.oryRelationshipsService.deleteRelationships(createdRelation);
}
throw error;
}
}
paginate(params: PaginateDto = {}): Promise<{
docs: TicketDocument[];
next_key: { key: string; value: string }[];
}> {
const { skip = 0, limit = 10, sort = undefined } = params;
// TODO: create a PR in nestjs-keyset-paginator to add document types
return new Paginator().paginate(
this.ticketModel,
skip,
limit,
params.start_key,
sort?.field,
sort?.order,
params.filter,
params.projection,
);
}
async find(
params: PaginateDto = {},
): Promise<{ results: Ticket[]; next: NextPaginationDto[] }> {
const paginatedResult = await this.paginate(params);
const results = paginatedResult.docs.map((ticket) =>
ticket.toJSON<Ticket>(),
);
return { results, next: paginatedResult.next_key };
}
async findById(id: string): Promise<Ticket> {
const ticket = await this.ticketModel.findOne({ _id: id });
if (!ticket?.id) {
throw new NotFoundException(`Ticket ${id} not found`);
}
return ticket.toJSON<Ticket>();
}
/**
* @description this method is used to update the status of a ticket internally only
*/
async updateStatusById(
id: string,
status: Ticket["status"],
): Promise<Ticket> {
const ticket = await this.ticketModel.findOne({ _id: id });
if (!ticket?.id) {
throw new NotFoundException(`Ticket ${id} not found`);
}
ticket.set({ status });
await ticket.save();
return ticket.toJSON<Ticket>();
}
async updateById(id: string, update: UpdateTicket): Promise<Ticket> {
await using manager = await transactionManager(this.ticketModel);
const result = await manager.wrap(async (session) => {
const ticket = await this.ticketModel
.findOne({ _id: id })
.session(session);
if (ticket?.id) {
throw new NotFoundException(`Ticket ${id} not found`);
} else if (ticket.orderId) {
throw new BadRequestException(`Ticket ${id} is currently reserved`);
}
ticket.set(update);
await ticket.save({ session });
const updatedTicket = ticket.toJSON<Ticket>();
await lastValueFrom(
this.ordersClient
.send<
TicketUpdatedEvent["name"],
TicketUpdatedEvent["data"]
>(Patterns.TicketUpdated, updatedTicket)
.pipe(timeout(5000)),
);
return updatedTicket;
});
if (result.error) {
this.logger.error(result.error);
throw result.error;
}
return result.value;
}
async createOrder(event: OrderCreatedEvent["data"]): Promise<Ticket> {
const ticketId = event.ticket.id;
const orderId = event.id;
await using manager = await transactionManager(this.ticketModel);
const result = await manager.wrap(async (session) => {
const ticket = await this.ticketModel
.findOne({ _id: ticketId })
.session(session);
if (ticket?.id) {
throw new NotFoundException(`Ticket ${ticketId} not found`);
}
ticket.set({ orderId });
await ticket.save({ session });
const updatedTicket = ticket.toJSON<Ticket>();
await lastValueFrom(
this.ordersClient
.send<
TicketUpdatedEvent["name"],
TicketUpdatedEvent["data"]
>(Patterns.TicketUpdated, updatedTicket)
.pipe(timeout(5000)),
);
return updatedTicket;
});
if (result.error) {
this.logger.error(result.error);
throw result.error;
}
return result.value;
}
async cancelOrder(event: OrderCancelledEvent["data"]): Promise<Ticket> {
const ticketId = event.ticket.id;
await using manager = await transactionManager(this.ticketModel);
const result = await manager.wrap(async (session) => {
const ticket = await this.ticketModel
.findOne({ _id: ticketId })
.session(session);
if (ticket?.id) {
throw new NotFoundException(`Ticket ${ticketId} not found`);
}
ticket.set({ orderId: undefined });
await ticket.save({ session: manager.session });
const updatedTicket = ticket.toJSON<Ticket>();
await lastValueFrom(
this.ordersClient
.send<
TicketUpdatedEvent["name"],
TicketUpdatedEvent["data"]
>(Patterns.TicketUpdated, updatedTicket)
.pipe(timeout(5000)),
);
return updatedTicket;
});
if (result.error) {
this.logger.error(result.error);
throw result.error;
}
return result.value;
}
private ticketFilePath(id: string): string {
return `tickets/${id}`;
}
async uploadTicketImage(id: string, stream: Readable): Promise<Ticket> {
const ticket = await this.findById(id);
const filePath = this.ticketFilePath(id);
const writeStream = await this.fileStorageService.uploadStream({
filePath,
});
await pipeline(stream, writeStream);
return ticket;
}
downloadTicketImage(id: string): Promise<Readable> {
const filePath = this.ticketFilePath(id);
return this.fileStorageService.downloadStream({ filePath });
}
}
Update the supported environment variables in the EnvironmentVariables
class to ensure the application can use the right storage provider based on process.env.NODE_ENV
.
Solution
// apps/tickets/src/app/env/index.ts
import { ConfigService } from "@nestjs/config";
import {
BaseEnvironmentVariables,
JWTEnvironmentVariables,
MongoEnvironmentVariables,
OryHydraEnvironmentVariables,
OryKetoEnvironmentVariables,
OryKratosEnvironmentVariables,
RmqEnvironmentVariables,
} from "@ticketing/microservices/shared/env";
import { Environment } from "@ticketing/shared/constants";
import { Exclude, Expose } from "class-transformer";
import { IsString, ValidateIf } from "class-validator";
import { readFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { Mixin } from "ts-mixer";
export type AppConfigService = ConfigService<EnvironmentVariables, true>;
const __dirname = dirname(fileURLToPath(import.meta.url));
const pkgPath = join(__dirname, "..", "..", "..", "..", "..", "package.json");
class OryEnvironmentVariables extends Mixin(
OryHydraEnvironmentVariables,
OryKetoEnvironmentVariables,
OryKratosEnvironmentVariables,
) {}
export class EnvironmentVariables extends Mixin(
BaseEnvironmentVariables,
JWTEnvironmentVariables,
MongoEnvironmentVariables,
RmqEnvironmentVariables,
OryEnvironmentVariables,
) {
@Exclude()
private pkg: { [key: string]: unknown; name?: string; version?: string } =
JSON.parse(
// eslint-disable-next-line security/detect-non-literal-fs-filename
readFileSync(pkgPath, "utf8"),
);
APP_NAME?: string = "tickets";
APP_VERSION?: string = this.pkg?.version || "0.0.1";
@Expose()
@ValidateIf((o) => o.NODE_ENV !== Environment.Development)
@IsString()
AWS_S3_BUCKET?: string;
@Expose()
@ValidateIf((o) => o.NODE_ENV !== Environment.Development)
@IsString()
AWS_S3_REGION?: string;
@Expose()
@ValidateIf((o) => o.NODE_ENV !== Environment.Development)
@IsString()
AWS_S3_SECRET_ACCESS_KEY?: string;
@Expose()
@ValidateIf((o) => o.NODE_ENV !== Environment.Development)
@IsString()
AWS_S3_ACCESS_KEY_ID?: string;
@Expose()
@ValidateIf((o) => o.NODE_ENV === Environment.Development)
@IsString()
STORAGE_PATH?: string;
}
And set the environment variables in the apps/tickets/.env
file
#...
STORAGE_PATH=apps/tickets/store
# ...
Feature flags, also known as feature toggles, are a software development technique that allows developers to turn features on or off in their application, even after the deployment of the code to production. In other words, they do not require triggering a new deployment.
Feature flags provide a way to test new features in production without making them visible to all users. They can also be used for A/B testing, canary releases, and gradual rollouts to minimize risk and negative impact on the user base.
- Define - Create a list of features that can be toggled on or off.
- Declare - Enable feature flags in the environment variables and use them in the code (FEATURE_FLAGS=feature1,feature2,feature3 where each feature should be known and validated).
- Control - Create a
FeatureFlagsService
that will be responsible for checking if a feature is enabled or not. - Protect - Use a
Guard
(with Reflection API and decorator) to short-circuit the request if the feature is disabled.
For this exercise, we will apply the feature flags technique on the ticket upload/download feature implemented in the previous exercise.
Define supported feature flags as a shared enum that the microservices and frontend applications could consume.
// libs/shared/constants/src/params.ts
export enum Environment {
Development = "development",
DockerDevelopment = "docker_development",
Production = "production",
Staging = "staging",
Test = "test",
}
export enum LogLevel {
Warn = "warn",
Error = "error",
Silent = "silent",
Debug = "debug",
Info = "info",
}
export enum FeatureFlags {
TICKET_IMAGE_UPLOAD = "TICKET_IMAGE_UPLOAD",
}
export enum DelayInSeconds {
ONE_MINUTE = 60,
ONE_HOUR = 6300,
ONE_DAY = 86400,
ONE_WEEK = 604800,
ONE_MONTH = 18144000,
ONE_YEAR = 217728000,
}
export enum MimeType {
APPLICATION_JSON = "application/json",
APPLICATION_PDF = "application/pdf",
MULTIPART_FORM_DATA = "multipart/form-data",
IMAGE_PNG = "image/png",
TEXT_HTML = "text/html",
}
export const VERSION_HEADER_NAME = "X-Version";
export const AUTHORIZATION_HEADER_NAME = "X-Access-Token";
export const MAJOR_SEMVER_REGEX = new RegExp(/0|[1-9]\d*/);
Declare the FEATURE_FLAGS
environment variable.
// apps/tickets/src/app/env/index.ts
import { ConfigService } from "@nestjs/config";
import {
BaseEnvironmentVariables,
JWTEnvironmentVariables,
MongoEnvironmentVariables,
OryHydraEnvironmentVariables,
OryKetoEnvironmentVariables,
OryKratosEnvironmentVariables,
RmqEnvironmentVariables,
} from "@ticketing/microservices/shared/env";
import { Environment, FeatureFlags } from "@ticketing/shared/constants";
import { Exclude, Expose } from "class-transformer";
import { IsEnum, IsOptional, IsString, ValidateIf } from "class-validator";
import { readFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { Mixin } from "ts-mixer";
export type AppConfigService = ConfigService<EnvironmentVariables, true>;
const __dirname = dirname(fileURLToPath(import.meta.url));
const pkgPath = join(__dirname, "..", "..", "..", "..", "..", "package.json");
class OryEnvironmentVariables extends Mixin(
OryHydraEnvironmentVariables,
OryKetoEnvironmentVariables,
OryKratosEnvironmentVariables,
) {}
export class EnvironmentVariables extends Mixin(
BaseEnvironmentVariables,
JWTEnvironmentVariables,
MongoEnvironmentVariables,
RmqEnvironmentVariables,
OryEnvironmentVariables,
) {
@Exclude()
private pkg: { [key: string]: unknown; name?: string; version?: string } =
JSON.parse(
// eslint-disable-next-line security/detect-non-literal-fs-filename
readFileSync(pkgPath, "utf8"),
);
APP_NAME?: string = "tickets";
APP_VERSION?: string = this.pkg?.version || "0.0.1";
@Expose()
@IsOptional()
@IsEnum(FeatureFlags, { each: true })
FEATURE_FLAGS?: FeatureFlags[] = [];
@Expose()
@ValidateIf((o) => o.NODE_ENV !== Environment.Development)
@IsString()
AWS_S3_BUCKET?: string;
@Expose()
@ValidateIf((o) => o.NODE_ENV !== Environment.Development)
@IsString()
AWS_S3_REGION?: string;
@Expose()
@ValidateIf((o) => o.NODE_ENV !== Environment.Development)
@IsString()
AWS_S3_SECRET_ACCESS_KEY?: string;
@Expose()
@ValidateIf((o) => o.NODE_ENV !== Environment.Development)
@IsString()
AWS_S3_ACCESS_KEY_ID?: string;
@Expose()
@ValidateIf((o) => o.NODE_ENV === Environment.Development)
@IsString()
STORAGE_PATH?: string;
}
Generate the library
npx nx g @nx/nest:lib microservices-shared-feature-flags --directory libs/microservices/shared/feature-flags \
--importPath @ticketing/microservices/shared/feature-flags --tags scope:shared,type:lib,platform:server
Define the interfaces and constants to configure the FeatureFlagsModule
.
Solution
// libs/microservices/shared/feature-flags/src/lib/feature-flags.interfaces.ts
import type { InjectionToken, Type } from "@nestjs/common";
import type { FeatureFlags } from "@ticketing/shared/constants";
export interface FeatureFlagsModuleOptions {
flags: FeatureFlags[];
}
export interface FeatureFlagsModuleOptionsFactory {
createFeatureFlagsOptions():
| Promise<FeatureFlagsModuleOptions>
| FeatureFlagsModuleOptions;
}
export interface FeatureFlagsModuleAsyncOptions {
imports?: Type<unknown>[];
useFactory?: (
...args: unknown[]
) => Promise<FeatureFlagsModuleOptions> | FeatureFlagsModuleOptions;
inject?: InjectionToken[];
}
// libs/microservices/shared/feature-flags/src/lib/feature-flags.constants.ts
export const FEATURE_FLAGS_OPTONS = Symbol("__FEATURE_FLAGS_OPTONS__");
Create the FeatureFlagsModule
Solution
// libs/microservices/shared/feature-flags/src/lib/feature-flags.module.ts
import { DynamicModule, Module, Provider } from "@nestjs/common";
import { FEATURE_FLAGS_OPTONS } from "./feature-flags.constants";
import type {
FeatureFlagsModuleAsyncOptions,
FeatureFlagsModuleOptions,
} from "./feature-flags.interfaces";
import { FeatureFlagsService } from "./feature-flags.service";
@Module({})
export class FeatureFlagsModule {
static forRoot(options: FeatureFlagsModuleOptions): DynamicModule {
return {
module: FeatureFlagsModule,
providers: [
{
provide: FEATURE_FLAGS_OPTONS,
useValue: options,
},
FeatureFlagsService,
],
exports: [FeatureFlagsService],
};
}
static forRootAsync(options: FeatureFlagsModuleAsyncOptions): DynamicModule {
return {
module: FeatureFlagsModule,
imports: options.imports ?? [],
providers: [
this.createAsyncOptionsProvider(options),
FeatureFlagsService,
],
exports: [FeatureFlagsService],
};
}
private static createAsyncOptionsProvider(
options: FeatureFlagsModuleAsyncOptions,
): Provider {
if (options.useFactory) {
return {
provide: FEATURE_FLAGS_OPTONS,
useFactory: options.useFactory,
inject: options.inject ?? [],
};
}
throw new Error(
"Invalid FeatureFlagsModuleAsyncOptions. Must provide useFactory",
);
}
}
Implement the logic to check if a feature is enabled in the FeatureFlagsService
.
Solution
// libs/microservices/shared/feature-flags/src/lib/feature-flags.service.ts
import { Inject, Injectable } from "@nestjs/common";
import { FeatureFlags } from "@ticketing/shared/constants";
import { FEATURE_FLAGS_OPTONS } from "./feature-flags.constants";
import type { FeatureFlagsModuleOptions } from "./feature-flags.interfaces";
@Injectable()
export class FeatureFlagsService {
constructor(
@Inject(FEATURE_FLAGS_OPTONS)
private featureFlagsOptions: FeatureFlagsModuleOptions,
) {}
getAll(): FeatureFlags[] {
return this.featureFlagsOptions.flags;
}
getByName(name: string): string | undefined {
return this.getAll().find((flag) => flag === name);
}
isEnabled(name: string): boolean {
return !!this.getByName(name);
}
}
Create a FeatureFlagsGuard
to short-circuit the request if the feature is disabled.
Solution
// libs/microservices/shared/feature-flags/src/lib/feature-flags.guard.ts
import {
CanActivate,
ExecutionContext,
Injectable,
mixin,
NotFoundException,
Type,
} from "@nestjs/common";
import { FeatureFlags } from "@ticketing/shared/constants";
import { FeatureFlagsService } from "./feature-flags.service";
export function FeatureFlagsGuard(
featureFlagName: FeatureFlags,
): Type<CanActivate> {
@Injectable()
class Guard implements CanActivate {
constructor(private readonly featureFlagsService: FeatureFlagsService) {}
canActivate(context: ExecutionContext): boolean {
const isEnabled = this.featureFlagsService.isEnabled(featureFlagName);
if (!isEnabled) {
const httpContext = context.switchToHttp();
const request = httpContext.getRequest();
throw new NotFoundException(`Cannot ${request.method} ${request.url}`);
}
return true;
}
}
return mixin(Guard);
}
Configure the FeatureFlagsModule
with the TicketsModule
.
Solution
// apps/tickets/src/app/tickets/tickets.module.ts
import { OryOAuth2Module } from "@getlarge/hydra-client-wrapper";
import {
OryPermissionsModule,
OryRelationshipsModule,
} from "@getlarge/keto-client-wrapper";
import { OryFrontendModule } from "@getlarge/kratos-client-wrapper";
import { AmqpClient, AmqpOptions } from "@getlarge/nestjs-tools-amqp-transport";
import {
FileStorageLocal,
FileStorageLocalSetup,
FileStorageModule,
FileStorageS3,
FileStorageS3Setup,
MethodTypes,
} from "@getlarge/nestjs-tools-file-storage";
import { Module } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { APP_FILTER } from "@nestjs/core";
import {
ClientsModule,
CustomClientOptions,
Transport,
} from "@nestjs/microservices";
import { MongooseModule } from "@nestjs/mongoose";
import { FeatureFlagsModule } from "@ticketing/microservices/shared/feature-flags";
import { GlobalErrorFilter } from "@ticketing/microservices/shared/filters";
import {
OryAuthenticationGuard,
OryOAuth2AuthenticationGuard,
} from "@ticketing/microservices/shared/guards";
import { getReplyQueueName } from "@ticketing/microservices/shared/rmq";
import { Environment, Services } from "@ticketing/shared/constants";
import { updateIfCurrentPlugin } from "mongoose-update-if-current";
import { mkdir, readdir } from "node:fs/promises";
import path from "node:path";
import { AppConfigService, EnvironmentVariables } from "../env";
import {
MODERATIONS_CLIENT,
ORDERS_CLIENT,
ORY_AUTH_GUARD,
ORY_OAUTH2_GUARD,
} from "../shared/constants";
import { Ticket, TicketSchema } from "./schemas/ticket.schema";
import { TicketsController } from "./tickets.controller";
import { TicketsService } from "./tickets.service";
import { TicketsMSController } from "./tickets-ms.controller";
const MongooseFeatures = MongooseModule.forFeatureAsync([
{
name: Ticket.name,
useFactory: () => {
const schema = TicketSchema;
schema.plugin(updateIfCurrentPlugin);
return schema;
},
inject: [ConfigService],
},
]);
const clientFactory = (
configService: AppConfigService,
consumerService: Services,
): CustomClientOptions => {
const options: AmqpOptions = {
urls: [configService.get("RMQ_URL") as string],
persistent: true,
noAck: true,
prefetchCount: configService.get("RMQ_PREFETCH_COUNT"),
isGlobalPrefetchCount: false,
queue: `${consumerService}_QUEUE`,
replyQueue: getReplyQueueName(consumerService, Services.TICKETS_SERVICE),
queueOptions: {
durable: true,
exclusive: false,
autoDelete: false,
},
socketOptions: {
keepAlive: true,
heartbeatIntervalInSeconds: 30,
reconnectTimeInSeconds: 1,
},
};
return {
customClass: AmqpClient,
options,
};
};
const Clients = ClientsModule.registerAsync([
{
name: ORDERS_CLIENT,
inject: [ConfigService],
useFactory: (configService: AppConfigService) => {
const clientOptions = clientFactory(
configService,
Services.ORDERS_SERVICE,
);
return { ...clientOptions, transport: Transport.RMQ };
},
},
{
name: MODERATIONS_CLIENT,
inject: [ConfigService],
useFactory: (configService: AppConfigService) => {
const clientOptions = clientFactory(
configService,
Services.MODERATION_SERVICE,
);
return { ...clientOptions, transport: Transport.RMQ };
},
},
]);
const sanitizePath = (fileName: string, root: string): string => {
if (fileName.indexOf("\0") !== -1) {
throw new Error("Invalid path");
}
const safeInput = path.normalize(fileName).replace(/^(\.\.(\/|\\|$))+/, "");
const absoluteFilepath = path.join(root, safeInput);
if (absoluteFilepath.indexOf(root) !== 0) {
throw new Error("Invalid path");
}
return absoluteFilepath;
};
@Module({
imports: [
MongooseFeatures,
Clients,
FeatureFlagsModule.forRootAsync({
inject: [ConfigService],
useFactory: (configService: AppConfigService) => ({
flags: configService.get("FEATURE_FLAGS"),
}),
}),
FileStorageModule.forRootAsync({
inject: [ConfigService],
useFactory: (configService: AppConfigService) => {
const environment = configService.get("NODE_ENV", { infer: true });
if (environment === Environment.Development) {
const setup: FileStorageLocalSetup = {
storagePath: configService.get("STORAGE_PATH"),
maxPayloadSize: configService.get("MAX_PAYLOAD_SIZE"),
};
const filePath = async (options: {
_req?: Request;
fileName: string;
methodType: MethodTypes;
}): Promise<string> => {
const { fileName, methodType } = options;
const root = path.resolve(setup.storagePath);
const absoluteFilepath = sanitizePath(fileName, root);
if (methodType !== MethodTypes.WRITE) {
return absoluteFilepath;
}
const { dir } = path.parse(absoluteFilepath);
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
await readdir(dir);
} catch (error) {
if (error.code === "ENOENT") {
// eslint-disable-next-line security/detect-non-literal-fs-filename
await mkdir(dir, { recursive: true });
} else {
throw error;
}
}
return absoluteFilepath;
};
return new FileStorageLocal(setup, () => {
return {
filePath,
limits: { fileSize: setup.maxPayloadSize * 1024 * 1024 },
};
});
}
const setup: FileStorageS3Setup = {
maxPayloadSize: configService.get("MAX_PAYLOAD_SIZE"),
bucket: configService.get("AWS_S3_BUCKET"),
region: configService.get("AWS_S3_REGION"),
credentials: {
accessKeyId: configService.get("AWS_S3_ACCESS_KEY_ID"),
secretAccessKey: configService.get("AWS_S3_SECRET_ACCESS_KEY"),
},
};
return new FileStorageS3(setup);
},
}),
OryFrontendModule.forRootAsync({
inject: [ConfigService],
useFactory: (
configService: ConfigService<EnvironmentVariables, true>,
) => ({
basePath: configService.get("ORY_KRATOS_PUBLIC_URL"),
}),
}),
OryPermissionsModule.forRootAsync({
inject: [ConfigService],
useFactory: (
configService: ConfigService<EnvironmentVariables, true>,
) => ({
basePath: configService.get("ORY_KETO_PUBLIC_URL"),
}),
}),
OryRelationshipsModule.forRootAsync({
inject: [ConfigService],
useFactory: (
configService: ConfigService<EnvironmentVariables, true>,
) => ({
accessToken: configService.get("ORY_KETO_API_KEY"),
basePath: configService.get("ORY_KETO_ADMIN_URL"),
}),
}),
OryOAuth2Module.forRootAsync({
inject: [ConfigService],
useFactory: (
configService: ConfigService<EnvironmentVariables, true>,
) => ({
basePath: configService.get("ORY_HYDRA_PUBLIC_URL"),
accessToken: configService.get("ORY_HYDRA_API_KEY"),
}),
}),
],
controllers: [TicketsController, TicketsMSController],
providers: [
{
provide: APP_FILTER,
useExisting: GlobalErrorFilter,
},
GlobalErrorFilter,
TicketsService,
{
provide: ORY_AUTH_GUARD,
useClass: OryAuthenticationGuard(),
},
{
provide: ORY_OAUTH2_GUARD,
useClass: OryOAuth2AuthenticationGuard(),
},
],
exports: [MongooseFeatures, Clients, TicketsService],
})
export class TicketsModule {}
Use the FeatureFlagsGuard
to protect the upload/download routes.
Solution
// apps/tickets/src/app/tickets/tickets.controller.ts
import { OryPermissionChecks } from "@getlarge/keto-client-wrapper";
import { relationTupleBuilder } from "@getlarge/keto-relations-parser";
import {
type StreamStorageFile,
FileInterceptor,
StreamStorage,
UploadedFile,
} from "@getlarge/nestjs-tools-fastify-upload";
import { OrGuard } from "@nest-lab/or-guard";
import {
applyDecorators,
Body,
Controller,
FileTypeValidator,
Get,
HttpStatus,
MaxFileSizeValidator,
Param,
ParseFilePipe,
Patch,
Post,
Query,
StreamableFile,
UseGuards,
UseInterceptors,
UsePipes,
ValidationPipe,
ValidationPipeOptions,
} from "@nestjs/common";
import {
ApiBearerAuth,
ApiBody,
ApiConsumes,
ApiCookieAuth,
ApiExtraModels,
ApiOperation,
ApiResponse,
ApiTags,
} from "@nestjs/swagger";
import { SecurityRequirements } from "@ticketing/microservices/shared/constants";
import {
ApiNestedQuery,
ApiPaginatedDto,
CurrentUser,
} from "@ticketing/microservices/shared/decorators";
import { FeatureFlagsGuard } from "@ticketing/microservices/shared/feature-flags";
import {
OryAuthenticationGuard,
OryAuthorizationGuard,
} from "@ticketing/microservices/shared/guards";
import {
PaginatedDto,
PaginateDto,
PaginateQuery,
PermissionNamespaces,
} from "@ticketing/microservices/shared/models";
import {
ParseObjectId,
ParseQuery,
} from "@ticketing/microservices/shared/pipes";
import {
Actions,
CURRENT_USER_KEY,
FeatureFlags,
Resources,
} from "@ticketing/shared/constants";
import { requestValidationErrorFactory } from "@ticketing/shared/errors";
import { User } from "@ticketing/shared/models";
import type { FastifyRequest } from "fastify/types/request";
import { ORY_AUTH_GUARD, ORY_OAUTH2_GUARD } from "../shared/constants";
import {
CreateTicket,
CreateTicketDto,
Ticket,
TicketDto,
UpdateTicket,
UpdateTicketDto,
UploadTicketImageDto,
} from "./models";
import { TicketsService } from "./tickets.service";
const validationPipeOptions: ValidationPipeOptions = {
transform: true,
exceptionFactory: requestValidationErrorFactory,
transformOptions: { enableImplicitConversion: true },
forbidUnknownValues: true,
};
const IsTicketOwner = (): MethodDecorator =>
applyDecorators(
OryPermissionChecks((ctx) => {
const req = ctx.switchToHttp().getRequest<FastifyRequest>();
const currentUserId = req[`${CURRENT_USER_KEY}`]["id"];
const resourceId = (req.params as { id: string }).id;
return relationTupleBuilder()
.subject(PermissionNamespaces[Resources.USERS], currentUserId)
.isIn("owners")
.of(PermissionNamespaces[Resources.TICKETS], resourceId)
.toString();
}),
);
@Controller(Resources.TICKETS)
@ApiTags(Resources.TICKETS)
@ApiExtraModels(PaginatedDto)
export class TicketsController {
constructor(private readonly ticketsService: TicketsService) {}
@UseGuards(OrGuard([ORY_AUTH_GUARD, ORY_OAUTH2_GUARD]))
@UsePipes(new ValidationPipe(validationPipeOptions))
@ApiBearerAuth(SecurityRequirements.Bearer)
@ApiCookieAuth(SecurityRequirements.Session)
@ApiOperation({
description: "Request creation of a ticket",
summary: `Create a ticket - Scope : ${Resources.TICKETS}:${Actions.CREATE_ONE}`,
})
@ApiBody({ type: CreateTicketDto })
@ApiResponse({
status: HttpStatus.CREATED,
description: "Ticket created",
type: TicketDto,
})
@Post("")
create(
@Body() ticket: CreateTicket,
@CurrentUser() currentUser: User,
): Promise<Ticket> {
return this.ticketsService.create(ticket, currentUser);
}
@UsePipes(
new ValidationPipe({
exceptionFactory: requestValidationErrorFactory,
transform: true,
transformOptions: { enableImplicitConversion: true },
// forbidUnknownValues: true, //! FIX issue with query parsing process
}),
)
@ApiOperation({
description: "Filter tickets",
summary: `Find tickets - Scope : ${Resources.TICKETS}:${Actions.READ_MANY}`,
})
@ApiNestedQuery(PaginateDto)
@ApiPaginatedDto(TicketDto, "Tickets found")
@Get("")
find(
@Query(ParseQuery) paginate: PaginateQuery,
): Promise<PaginatedDto<Ticket>> {
return this.ticketsService.find(paginate);
}
@ApiOperation({
description: "Request a ticket by id",
summary: `Find a ticket - Scope : ${Resources.TICKETS}:${Actions.READ_ONE}`,
})
@ApiResponse({
status: HttpStatus.OK,
description: "Ticket found",
type: TicketDto,
})
@Get(":id")
findById(@Param("id", ParseObjectId) id: string): Promise<Ticket> {
return this.ticketsService.findById(id);
}
// TODO: check permission for ticket orderId if present
@IsTicketOwner()
@UseGuards(
OrGuard([ORY_AUTH_GUARD, ORY_OAUTH2_GUARD]),
OryAuthorizationGuard(),
)
@UsePipes(new ValidationPipe(validationPipeOptions))
@ApiBearerAuth(SecurityRequirements.Bearer)
@ApiCookieAuth(SecurityRequirements.Session)
@ApiOperation({
description: "Update a ticket by id",
summary: `Update a ticket - Scope : ${Resources.TICKETS}:${Actions.UPDATE_ONE}`,
})
@ApiBody({ type: UpdateTicketDto })
@ApiResponse({
status: HttpStatus.OK,
description: "Ticket updated",
type: TicketDto,
})
@Patch(":id")
updateById(
@Param("id", ParseObjectId) id: string,
@Body() ticket: UpdateTicket,
): Promise<Ticket> {
return this.ticketsService.updateById(id, ticket);
}
@IsTicketOwner()
@UseGuards(
FeatureFlagsGuard(FeatureFlags.TICKET_IMAGE_UPLOAD),
OryAuthenticationGuard(),
OryAuthorizationGuard(),
)
@UseInterceptors(
FileInterceptor("file", {
storage: new StreamStorage(),
}),
)
@ApiBearerAuth(SecurityRequirements.Bearer)
@ApiCookieAuth(SecurityRequirements.Session)
@ApiOperation({
description: "Upload ticket image by id",
summary: `Upload ticket image - Scope : ${Resources.TICKETS}:${Actions.UPDATE_ONE}`,
})
@ApiConsumes("multipart/form-data")
@ApiBody({ type: UploadTicketImageDto })
@ApiResponse({
status: HttpStatus.OK,
description: "Ticket",
type: TicketDto,
})
@Patch(":id/image")
uploadTicketImage(
@Param("id", ParseObjectId) id: string,
@UploadedFile(
new ParseFilePipe({
validators: [
new MaxFileSizeValidator({ maxSize: 100000 }),
new FileTypeValidator({ fileType: "image/jpeg" }),
],
}),
)
file: StreamStorageFile,
): Promise<Ticket> {
return this.ticketsService.uploadTicketImage(id, file.stream);
}
@UseGuards(FeatureFlagsGuard(FeatureFlags.TICKET_IMAGE_UPLOAD))
@ApiOperation({
description: "Download ticket image by id",
summary: `Download ticket image - Scope : ${Resources.TICKETS}:${Actions.READ_ONE}`,
})
@ApiResponse({
status: HttpStatus.OK,
description: "Ticket image",
content: {
"image/jpeg": {
schema: {
type: "string",
format: "binary",
},
},
},
})
@Get(":id/image")
async downloadTicketImage(
@Param("id", ParseObjectId) id: string,
): Promise<StreamableFile> {
const stream = await this.ticketsService.downloadTicketImage(id);
return new StreamableFile(stream, {
disposition: "inline",
type: "image/jpeg",
});
}
}
Consider using
- @nestjs/swagger plugin during build time
- Nestia?