From e73d9c1982deb95345918515e84e47f5ec0cb82a Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Sat, 13 Jul 2024 16:07:16 -0300 Subject: [PATCH] chore: Integration with MinIO and S3 Adds support for MinIO and S3 for storing media files. Modified several files to implement this feature, including package.json, prisma/postgresql-schema.prisma, src/api/integrations/typebot/services/typebot.service.ts, src/api/routes/index.router.ts, src/api/services/channels/whatsapp.baileys.service.ts, and src/config/env.config.ts. Added untracked files for the new S3 integration. Also added a new S3Controller and S3Service for handling S3 related operations. This change allows for more flexible media storage options and enables the use of MinIO or S3 for storing media files. --- package.json | 1 + .../migration.sql | 24 +++++ prisma/postgresql-schema.prisma | 14 +++ .../s3/controllers/s3.controller.ts | 15 ++++ src/api/integrations/s3/dto/media.dto.ts | 6 ++ src/api/integrations/s3/libs/minio.server.ts | 88 +++++++++++++++++++ src/api/integrations/s3/routes/s3.router.ts | 36 ++++++++ .../integrations/s3/services/s3.service.ts | 50 +++++++++++ src/api/integrations/s3/validate/s3.schema.ts | 43 +++++++++ .../typebot/services/typebot.service.ts | 6 +- src/api/routes/index.router.ts | 2 + src/api/server.module.ts | 5 ++ .../channels/whatsapp.baileys.service.ts | 69 ++++++++++++--- src/config/env.config.ts | 20 +++++ 14 files changed, 364 insertions(+), 15 deletions(-) create mode 100644 prisma/migrations/20240713184337_add_media_table/migration.sql create mode 100644 src/api/integrations/s3/controllers/s3.controller.ts create mode 100644 src/api/integrations/s3/dto/media.dto.ts create mode 100644 src/api/integrations/s3/libs/minio.server.ts create mode 100644 src/api/integrations/s3/routes/s3.router.ts create mode 100644 src/api/integrations/s3/services/s3.service.ts create mode 100644 src/api/integrations/s3/validate/s3.schema.ts diff --git a/package.json b/package.json index 7d027aa11..0d6f28ee2 100644 --- a/package.json +++ b/package.json @@ -76,6 +76,7 @@ "js-yaml": "^4.1.0", "jsonschema": "^1.4.1", "link-preview-js": "^3.0.4", + "minio": "^8.0.1", "node-cache": "^5.1.2", "node-mime-types": "^1.1.0", "node-windows": "^1.0.0-beta.8", diff --git a/prisma/migrations/20240713184337_add_media_table/migration.sql b/prisma/migrations/20240713184337_add_media_table/migration.sql new file mode 100644 index 000000000..17e038021 --- /dev/null +++ b/prisma/migrations/20240713184337_add_media_table/migration.sql @@ -0,0 +1,24 @@ +-- CreateTable +CREATE TABLE "Media" ( + "id" TEXT NOT NULL, + "fileName" VARCHAR(500) NOT NULL, + "type" VARCHAR(100) NOT NULL, + "mimetype" VARCHAR(100) NOT NULL, + "createdAt" DATE DEFAULT CURRENT_TIMESTAMP, + "messageId" TEXT NOT NULL, + "instanceId" TEXT NOT NULL, + + CONSTRAINT "Media_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "Media_fileName_key" ON "Media"("fileName"); + +-- CreateIndex +CREATE UNIQUE INDEX "Media_messageId_key" ON "Media"("messageId"); + +-- AddForeignKey +ALTER TABLE "Media" ADD CONSTRAINT "Media_messageId_fkey" FOREIGN KEY ("messageId") REFERENCES "Message"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "Media" ADD CONSTRAINT "Media_instanceId_fkey" FOREIGN KEY ("instanceId") REFERENCES "Instance"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/postgresql-schema.prisma b/prisma/postgresql-schema.prisma index 052f064c2..d90d03843 100644 --- a/prisma/postgresql-schema.prisma +++ b/prisma/postgresql-schema.prisma @@ -76,6 +76,7 @@ model Instance { MessageUpdate MessageUpdate[] TypebotSession TypebotSession[] TypebotSetting TypebotSetting? + Media Media[] } model Session { @@ -127,6 +128,7 @@ model Message { typebotSessionId String? MessageUpdate MessageUpdate[] TypebotSession TypebotSession? @relation(fields: [typebotSessionId], references: [id]) + Media Media? } model MessageUpdate { @@ -311,3 +313,15 @@ model TypebotSetting { Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) instanceId String @unique } + +model Media { + id String @id @default(cuid()) + fileName String @unique @db.VarChar(500) + type String @db.VarChar(100) + mimetype String @db.VarChar(100) + createdAt DateTime? @default(now()) @db.Date + Message Message @relation(fields: [messageId], references: [id], onDelete: Cascade) + messageId String @unique + Instance Instance @relation(fields: [instanceId], references: [id], onDelete: Cascade) + instanceId String +} diff --git a/src/api/integrations/s3/controllers/s3.controller.ts b/src/api/integrations/s3/controllers/s3.controller.ts new file mode 100644 index 000000000..132b6f76f --- /dev/null +++ b/src/api/integrations/s3/controllers/s3.controller.ts @@ -0,0 +1,15 @@ +import { InstanceDto } from '../../../dto/instance.dto'; +import { MediaDto } from '../dto/media.dto'; +import { S3Service } from '../services/s3.service'; + +export class S3Controller { + constructor(private readonly s3Service: S3Service) {} + + public async getMedia(instance: InstanceDto, data: MediaDto) { + return this.s3Service.getMedia(instance, data); + } + + public async getMediaUrl(instance: InstanceDto, data: MediaDto) { + return this.s3Service.getMediaUrl(instance, data); + } +} diff --git a/src/api/integrations/s3/dto/media.dto.ts b/src/api/integrations/s3/dto/media.dto.ts new file mode 100644 index 000000000..1c6c58557 --- /dev/null +++ b/src/api/integrations/s3/dto/media.dto.ts @@ -0,0 +1,6 @@ +export class MediaDto { + id?: string; + type?: string; + messageId?: number; + expiry?: number; +} diff --git a/src/api/integrations/s3/libs/minio.server.ts b/src/api/integrations/s3/libs/minio.server.ts new file mode 100644 index 000000000..a2afb3057 --- /dev/null +++ b/src/api/integrations/s3/libs/minio.server.ts @@ -0,0 +1,88 @@ +import * as MinIo from 'minio'; +import { join } from 'path'; +import { Readable, Transform } from 'stream'; + +import { ConfigService, S3 } from '../../../../config/env.config'; +import { Logger } from '../../../../config/logger.config'; +import { BadRequestException } from '../../../../exceptions'; + +const logger = new Logger('S3 Service'); + +const BUCKET = new ConfigService().get('S3'); + +interface Metadata extends MinIo.ItemBucketMetadata { + 'Content-Type': string; +} + +const minioClient = (() => { + if (BUCKET?.ENABLE) { + return new MinIo.Client({ + endPoint: BUCKET.ENDPOINT, + port: BUCKET.PORT, + useSSL: BUCKET.USE_SSL, + accessKey: BUCKET.ACCESS_KEY, + secretKey: BUCKET.SECRET_KEY, + }); + } +})(); + +const bucketName = process.env.S3_BUCKET; + +const bucketExists = async () => { + if (minioClient) { + try { + const list = await minioClient.listBuckets(); + return list.find((bucket) => bucket.name === bucketName); + } catch (error) { + return false; + } + } +}; + +const createBucket = async () => { + if (minioClient) { + try { + const exists = await bucketExists(); + if (!exists) { + await minioClient.makeBucket(bucketName); + } + + logger.info(`S3 Bucket ${bucketName} - ON`); + return true; + } catch (error) { + console.log('S3 ERROR: ', error); + return false; + } + } +}; + +createBucket(); + +const uploadFile = async (fileName: string, file: Buffer | Transform | Readable, size: number, metadata: Metadata) => { + if (minioClient) { + const objectName = join('evolution-api', fileName); + try { + metadata['custom-header-application'] = 'evolution-api'; + return await minioClient.putObject(bucketName, objectName, file, size, metadata); + } catch (error) { + console.log('ERROR: ', error); + return error; + } + } +}; + +const getObjectUrl = async (fileName: string, expiry?: number) => { + if (minioClient) { + try { + const objectName = join('evolution-api', fileName); + if (expiry) { + return await minioClient.presignedGetObject(bucketName, objectName, expiry); + } + return await minioClient.presignedGetObject(bucketName, objectName); + } catch (error) { + throw new BadRequestException(error?.message); + } + } +}; + +export { BUCKET, getObjectUrl, uploadFile }; diff --git a/src/api/integrations/s3/routes/s3.router.ts b/src/api/integrations/s3/routes/s3.router.ts new file mode 100644 index 000000000..bdbabc1df --- /dev/null +++ b/src/api/integrations/s3/routes/s3.router.ts @@ -0,0 +1,36 @@ +import { RequestHandler, Router } from 'express'; + +import { RouterBroker } from '../../../abstract/abstract.router'; +import { HttpStatus } from '../../../routes/index.router'; +import { s3Controller } from '../../../server.module'; +import { MediaDto } from '../dto/media.dto'; +import { s3Schema, s3UrlSchema } from '../validate/s3.schema'; + +export class S3Router extends RouterBroker { + constructor(...guards: RequestHandler[]) { + super(); + this.router + .post(this.routerPath('getMedia'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: s3Schema, + ClassRef: MediaDto, + execute: (instance, data) => s3Controller.getMedia(instance, data), + }); + + res.status(HttpStatus.CREATED).json(response); + }) + .post(this.routerPath('getMediaUrl'), ...guards, async (req, res) => { + const response = await this.dataValidate({ + request: req, + schema: s3UrlSchema, + ClassRef: MediaDto, + execute: (instance, data) => s3Controller.getMediaUrl(instance, data), + }); + + res.status(HttpStatus.OK).json(response); + }); + } + + public readonly router = Router(); +} diff --git a/src/api/integrations/s3/services/s3.service.ts b/src/api/integrations/s3/services/s3.service.ts new file mode 100644 index 000000000..30ababbbc --- /dev/null +++ b/src/api/integrations/s3/services/s3.service.ts @@ -0,0 +1,50 @@ +import { Logger } from '../../../../config/logger.config'; +import { BadRequestException } from '../../../../exceptions'; +import { InstanceDto } from '../../../dto/instance.dto'; +import { PrismaRepository } from '../../../repository/repository.service'; +import { MediaDto } from '../dto/media.dto'; +import { getObjectUrl } from '../libs/minio.server'; + +export class S3Service { + constructor(private readonly prismaRepository: PrismaRepository) {} + + private readonly logger = new Logger(S3Service.name); + + public async getMedia(instance: InstanceDto, query?: MediaDto) { + try { + const where: any = { + instanceId: instance.instanceId, + ...query, + }; + + const media = await this.prismaRepository.media.findMany({ + where, + select: { + id: true, + fileName: true, + type: true, + mimetype: true, + createdAt: true, + Message: true, + }, + }); + + if (!media || media.length === 0) { + throw 'Media not found'; + } + + return media; + } catch (error) { + throw new BadRequestException(error); + } + } + + public async getMediaUrl(instance: InstanceDto, data: MediaDto) { + const media = (await this.getMedia(instance, { id: data.id }))[0]; + const mediaUrl = await getObjectUrl(media.fileName, data.expiry); + return { + mediaUrl, + ...media, + }; + } +} diff --git a/src/api/integrations/s3/validate/s3.schema.ts b/src/api/integrations/s3/validate/s3.schema.ts new file mode 100644 index 000000000..00709a9b0 --- /dev/null +++ b/src/api/integrations/s3/validate/s3.schema.ts @@ -0,0 +1,43 @@ +import { JSONSchema7 } from 'json-schema'; +import { v4 } from 'uuid'; + +const isNotEmpty = (...propertyNames: string[]): JSONSchema7 => { + const properties = {}; + propertyNames.forEach( + (property) => + (properties[property] = { + minLength: 1, + description: `The "${property}" cannot be empty`, + }), + ); + return { + if: { + propertyNames: { + enum: [...propertyNames], + }, + }, + then: { properties }, + }; +}; + +export const s3Schema: JSONSchema7 = { + $id: v4(), + type: 'object', + properties: { + id: { type: 'string' }, + type: { type: 'string' }, + messageId: { type: 'integer' }, + }, + ...isNotEmpty('id', 'type', 'messageId'), +}; + +export const s3UrlSchema: JSONSchema7 = { + $id: v4(), + type: 'object', + properties: { + id: { type: 'string', pattern: '\\d+', minLength: 1 }, + expiry: { type: 'string', pattern: '\\d+', minLength: 1 }, + }, + ...isNotEmpty('id'), + required: ['id'], +}; diff --git a/src/api/integrations/typebot/services/typebot.service.ts b/src/api/integrations/typebot/services/typebot.service.ts index 955264a5b..b5bcb1e85 100644 --- a/src/api/integrations/typebot/services/typebot.service.ts +++ b/src/api/integrations/typebot/services/typebot.service.ts @@ -651,17 +651,17 @@ export class TypebotService { if (ignoreGroups && remoteJid.includes('@g.us')) { this.logger.warn('Ignoring message from group: ' + remoteJid); - return; + throw new Error('Group not allowed'); } if (ignoreContacts && remoteJid.includes('@s.whatsapp.net')) { this.logger.warn('Ignoring message from contact: ' + remoteJid); - return; + throw new Error('Contact not allowed'); } if (ignoreJids.includes(remoteJid)) { this.logger.warn('Ignoring message from jid: ' + remoteJid); - return; + throw new Error('Jid not allowed'); } } diff --git a/src/api/routes/index.router.ts b/src/api/routes/index.router.ts index 9e407eb37..f809345bb 100644 --- a/src/api/routes/index.router.ts +++ b/src/api/routes/index.router.ts @@ -6,6 +6,7 @@ import { authGuard } from '../guards/auth.guard'; import { instanceExistsGuard, instanceLoggedGuard } from '../guards/instance.guard'; import { ChatwootRouter } from '../integrations/chatwoot/routes/chatwoot.router'; import { RabbitmqRouter } from '../integrations/rabbitmq/routes/rabbitmq.router'; +import { S3Router } from '../integrations/s3/routes/s3.router'; import { SqsRouter } from '../integrations/sqs/routes/sqs.router'; import { TypebotRouter } from '../integrations/typebot/routes/typebot.router'; import { WebsocketRouter } from '../integrations/websocket/routes/websocket.router'; @@ -63,6 +64,7 @@ router .use('/typebot', new TypebotRouter(...guards).router) .use('/proxy', new ProxyRouter(...guards).router) .use('/label', new LabelRouter(...guards).router) + .use('/s3', new S3Router(...guards).router) .get('/webhook/meta', async (req, res) => { if (req.query['hub.verify_token'] === configService.get('WA_BUSINESS').TOKEN_WEBHOOK) res.send(req.query['hub.challenge']); diff --git a/src/api/server.module.ts b/src/api/server.module.ts index 73111e318..c99115fa3 100644 --- a/src/api/server.module.ts +++ b/src/api/server.module.ts @@ -15,6 +15,8 @@ import { ChatwootController } from './integrations/chatwoot/controllers/chatwoot import { ChatwootService } from './integrations/chatwoot/services/chatwoot.service'; import { RabbitmqController } from './integrations/rabbitmq/controllers/rabbitmq.controller'; import { RabbitmqService } from './integrations/rabbitmq/services/rabbitmq.service'; +import { S3Controller } from './integrations/s3/controllers/s3.controller'; +import { S3Service } from './integrations/s3/services/s3.service'; import { SqsController } from './integrations/sqs/controllers/sqs.controller'; import { SqsService } from './integrations/sqs/services/sqs.service'; import { TypebotController } from './integrations/typebot/controllers/typebot.controller'; @@ -63,6 +65,9 @@ const authService = new AuthService(prismaRepository); const typebotService = new TypebotService(waMonitor, configService, prismaRepository); export const typebotController = new TypebotController(typebotService); +const s3Service = new S3Service(prismaRepository); +export const s3Controller = new S3Controller(s3Service); + const webhookService = new WebhookService(waMonitor, prismaRepository); export const webhookController = new WebhookController(webhookService, waMonitor); diff --git a/src/api/services/channels/whatsapp.baileys.service.ts b/src/api/services/channels/whatsapp.baileys.service.ts index e4fe38d73..38f0125a6 100644 --- a/src/api/services/channels/whatsapp.baileys.service.ts +++ b/src/api/services/channels/whatsapp.baileys.service.ts @@ -48,6 +48,7 @@ import ffmpeg from 'fluent-ffmpeg'; // import ffmpeg from 'fluent-ffmpeg'; import { existsSync, readFileSync } from 'fs'; import Long from 'long'; +import mime from 'mime'; import NodeCache from 'node-cache'; import { getMIMEType } from 'node-mime-types'; import { release } from 'os'; @@ -57,6 +58,7 @@ import qrcode, { QRCodeToDataURLOptions } from 'qrcode'; import qrcodeTerminal from 'qrcode-terminal'; import sharp from 'sharp'; import { PassThrough } from 'stream'; +import { v4 } from 'uuid'; import { CacheEngine } from '../../../cache/cacheengine'; import { @@ -69,6 +71,7 @@ import { Log, ProviderSession, QrCode, + S3, Typebot, } from '../../../config/env.config'; import { INSTANCE_DIR } from '../../../config/path.config'; @@ -125,6 +128,7 @@ import { StatusMessage, } from '../../dto/sendMessage.dto'; import { chatwootImport } from '../../integrations/chatwoot/utils/chatwoot-import-helper'; +import * as s3Service from '../../integrations/s3/libs/minio.server'; import { ProviderFiles } from '../../provider/sessions'; import { PrismaRepository } from '../../repository/repository.service'; import { waMonitor } from '../../server.module'; @@ -1044,10 +1048,7 @@ export class BaileysStartupService extends ChannelStartupService { const contentMsg = received?.message[getContentType(received.message)] as any; - if ( - this.localWebhook.webhookBase64 === true || - (this.configService.get('TYPEBOT').SEND_MEDIA_BASE64 && isMedia) - ) { + if (this.localWebhook.webhookBase64 === true && isMedia) { const buffer = await downloadMediaMessage( { key: received.key, message: received?.message }, 'buffer', @@ -1092,10 +1093,6 @@ export class BaileysStartupService extends ChannelStartupService { await this.client.readMessages([received.key]); } - this.logger.log(messageRaw); - - this.sendDataWebhook(Events.MESSAGES_UPSERT, messageRaw); - if ( this.configService.get('CHATWOOT').ENABLED && this.localChatwoot.enabled && @@ -1114,10 +1111,52 @@ export class BaileysStartupService extends ChannelStartupService { } } - await this.prismaRepository.message.create({ + const msg = await this.prismaRepository.message.create({ data: messageRaw, }); + if (this.configService.get('S3').ENABLE && isMedia) { + try { + const message: any = received; + const media = await this.getBase64FromMediaMessage( + { + message, + }, + true, + ); + + const { buffer, mediaType, fileName, size } = media; + + const mimetype = mime.lookup(fileName).toString(); + + const fullName = join(`${this.instance.id}`, received.key.remoteJid, mediaType, fileName); + + await s3Service.uploadFile(fullName, buffer, size.fileLength, { + 'Content-Type': mimetype, + }); + + await this.prismaRepository.media.create({ + data: { + messageId: msg.id, + instanceId: this.instanceId, + type: mediaType, + fileName: fullName, + mimetype, + }, + }); + + const mediaUrl = await s3Service.getObjectUrl(fullName); + + messageRaw.message.mediaUrl = mediaUrl; + } catch (error) { + this.logger.error(['Error on upload file to minio', error?.message, error?.stack]); + } + } + + this.logger.log(messageRaw); + + this.sendDataWebhook(Events.MESSAGES_UPSERT, messageRaw); + if (this.configService.get('TYPEBOT').ENABLED) { if (type === 'notify') { if (messageRaw.messageType !== 'reactionMessage') @@ -2772,7 +2811,7 @@ export class BaileysStartupService extends ChannelStartupService { } } - public async getBase64FromMediaMessage(data: getBase64FromMediaMessageDto) { + public async getBase64FromMediaMessage(data: getBase64FromMediaMessageDto, getBuffer: boolean) { try { const m = data?.message; const convertToMp4 = data?.convertToMp4 ?? false; @@ -2819,13 +2858,17 @@ export class BaileysStartupService extends ChannelStartupService { ); const typeMessage = getContentType(msg.message); + const ext = mime.extension(mediaMessage?.['mimetype']); + + const fileName = mediaMessage?.['fileName'] || `${msg.key.id}.${ext}` || `${v4()}.${ext}`; + if (convertToMp4 && typeMessage === 'audioMessage') { const convert = await this.processAudioMp4(buffer.toString('base64')); if (Buffer.isBuffer(convert)) { const result = { mediaType, - fileName: mediaMessage['fileName'], + fileName, caption: mediaMessage['caption'], size: { fileLength: mediaMessage['fileLength'], @@ -2834,6 +2877,7 @@ export class BaileysStartupService extends ChannelStartupService { }, mimetype: 'audio/mp4', base64: convert, + buffer: getBuffer ? convert : null, }; return result; @@ -2842,7 +2886,7 @@ export class BaileysStartupService extends ChannelStartupService { return { mediaType, - fileName: mediaMessage['fileName'], + fileName, caption: mediaMessage['caption'], size: { fileLength: mediaMessage['fileLength'], @@ -2851,6 +2895,7 @@ export class BaileysStartupService extends ChannelStartupService { }, mimetype: mediaMessage['mimetype'], base64: buffer.toString('base64'), + buffer: getBuffer ? buffer : null, }; } catch (error) { this.logger.error(error); diff --git a/src/config/env.config.ts b/src/config/env.config.ts index 2385972fb..8512e9fa7 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -191,6 +191,16 @@ export type Chatwoot = { }; }; +export type S3 = { + ACCESS_KEY: string; + SECRET_KEY: string; + ENDPOINT: string; + BUCKET_NAME: string; + ENABLE: boolean; + PORT?: number; + USE_SSL?: boolean; +}; + export type CacheConf = { REDIS: CacheConfRedis; LOCAL: CacheConfLocal }; export type Production = boolean; @@ -214,6 +224,7 @@ export interface Env { TYPEBOT: Typebot; CHATWOOT: Chatwoot; CACHE: CacheConf; + S3?: S3; AUTHENTICATION: Auth; PRODUCTION?: Production; } @@ -431,6 +442,15 @@ export class ConfigService { TTL: Number.parseInt(process.env?.CACHE_REDIS_TTL) || 86400, }, }, + S3: { + ACCESS_KEY: process.env?.S3_ACCESS_KEY, + SECRET_KEY: process.env?.S3_SECRET_KEY, + ENDPOINT: process.env?.S3_ENDPOINT, + BUCKET_NAME: process.env?.S3_BUCKET, + ENABLE: process.env?.S3_ENABLED === 'true', + PORT: Number.parseInt(process.env?.S3_PORT || '9000'), + USE_SSL: process.env?.S3_USE_SSL === 'true', + }, AUTHENTICATION: { API_KEY: { KEY: process.env.AUTHENTICATION_API_KEY || 'BQYHJGJHJ',