From b0e03a99f8974a2c13347c07426237b086b06014 Mon Sep 17 00:00:00 2001 From: forehalo Date: Wed, 13 Nov 2024 19:50:00 +0800 Subject: [PATCH] feat(nbstore): add cloud implementation --- .../20241023065619_blobs/migration.sql | 14 ++ packages/backend/server/schema.prisma | 20 +- .../server/src/core/doc/storage/doc.ts | 29 +++ .../server/src/core/doc/storage/index.ts | 4 +- .../backend/server/src/core/quota/resolver.ts | 23 ++- .../backend/server/src/core/quota/storage.ts | 6 +- .../server/src/core/storage/wrappers/blob.ts | 176 +++++++++++++---- .../backend/server/src/core/sync/gateway.ts | 130 +++++++++---- .../src/core/workspaces/resolvers/blob.ts | 95 +++++----- .../server/src/fundamentals/event/def.ts | 2 +- .../src/fundamentals/storage/providers/fs.ts | 2 +- .../storage/providers/provider.ts | 2 +- .../fundamentals/storage/providers/utils.ts | 2 +- .../src/plugins/storage/providers/s3.ts | 2 +- packages/backend/server/src/schema.gql | 20 +- packages/backend/server/tests/utils/blobs.ts | 29 +-- .../blobs.e2e.ts} | 61 +----- packages/common/nbstore/package.json | 11 +- .../common/nbstore/src/impls/cloud/blob.ts | 72 +++++++ .../common/nbstore/src/impls/cloud/doc.ts | 177 ++++++++++++++++++ .../common/nbstore/src/impls/cloud/index.ts | 2 + .../common/nbstore/src/impls/cloud/socket.ts | 173 +++++++++++++++++ packages/common/nbstore/src/impls/index.ts | 5 +- .../src/modules/cloud/stores/user-quota.ts | 2 +- .../impls/engine/blob-cloud.ts | 4 +- .../graphql/src/graphql/blob-delete.gql | 8 +- .../graphql/src/graphql/blob-list.gql | 9 +- .../src/graphql/blob-release-deleted.gql | 3 + .../frontend/graphql/src/graphql/index.ts | 34 +++- .../frontend/graphql/src/graphql/quota.gql | 6 +- packages/frontend/graphql/src/schema.ts | 67 +++++-- yarn.lock | 4 + 32 files changed, 946 insertions(+), 248 deletions(-) create mode 100644 packages/backend/server/migrations/20241023065619_blobs/migration.sql rename packages/backend/server/tests/{workspace-blobs.spec.ts => workspace/blobs.e2e.ts} (76%) create mode 100644 packages/common/nbstore/src/impls/cloud/blob.ts create mode 100644 packages/common/nbstore/src/impls/cloud/doc.ts create mode 100644 packages/common/nbstore/src/impls/cloud/index.ts create mode 100644 packages/common/nbstore/src/impls/cloud/socket.ts create mode 100644 packages/frontend/graphql/src/graphql/blob-release-deleted.gql diff --git a/packages/backend/server/migrations/20241023065619_blobs/migration.sql b/packages/backend/server/migrations/20241023065619_blobs/migration.sql new file mode 100644 index 0000000000000..22fac2f05801b --- /dev/null +++ b/packages/backend/server/migrations/20241023065619_blobs/migration.sql @@ -0,0 +1,14 @@ +-- CreateTable +CREATE TABLE "blobs" ( + "workspace_id" VARCHAR NOT NULL, + "key" VARCHAR NOT NULL, + "size" INTEGER NOT NULL, + "mime" VARCHAR NOT NULL, + "created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "deleted_at" TIMESTAMPTZ(3), + + CONSTRAINT "blobs_pkey" PRIMARY KEY ("workspace_id","key") +); + +-- AddForeignKey +ALTER TABLE "blobs" ADD CONSTRAINT "blobs_workspace_id_fkey" FOREIGN KEY ("workspace_id") REFERENCES "workspaces"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/packages/backend/server/schema.prisma b/packages/backend/server/schema.prisma index acf2df7fa647d..9086965c81708 100644 --- a/packages/backend/server/schema.prisma +++ b/packages/backend/server/schema.prisma @@ -106,6 +106,7 @@ model Workspace { permissions WorkspaceUserPermission[] pagePermissions WorkspacePageUserPermission[] features WorkspaceFeature[] + blobs Blob[] @@map("workspaces") } @@ -335,7 +336,7 @@ model UserSubscription { // yearly/monthly/lifetime recurring String @db.VarChar(20) // onetime subscription or anything else - variant String? @db.VarChar(20) + variant String? @db.VarChar(20) // subscription.id, null for linefetime payment or one time payment subscription stripeSubscriptionId String? @unique @map("stripe_subscription_id") // subscription.status, active/past_due/canceled/unpaid... @@ -499,3 +500,20 @@ model RuntimeConfig { @@unique([module, key]) @@map("app_runtime_settings") } + +// Blob table only exists for fast non-data queries. +// like, total size of blobs in a workspace, or blob list for sync service. +// it should only be a map of metadata of blobs stored anywhere else +model Blob { + workspaceId String @map("workspace_id") @db.VarChar + key String @db.VarChar + size Int @db.Integer + mime String @db.VarChar + createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3) + deletedAt DateTime? @map("deleted_at") @db.Timestamptz(3) + + workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade) + + @@id([workspaceId, key]) + @@map("blobs") +} diff --git a/packages/backend/server/src/core/doc/storage/doc.ts b/packages/backend/server/src/core/doc/storage/doc.ts index 6010855484812..4bcbf4067be68 100644 --- a/packages/backend/server/src/core/doc/storage/doc.ts +++ b/packages/backend/server/src/core/doc/storage/doc.ts @@ -1,8 +1,10 @@ import { applyUpdate, + diffUpdate, Doc, encodeStateAsUpdate, encodeStateVector, + encodeStateVectorFromUpdate, mergeUpdates, UndoManager, } from 'yjs'; @@ -19,6 +21,12 @@ export interface DocRecord { editor?: string; } +export interface DocDiff { + missing: Uint8Array; + state: Uint8Array; + timestamp: number; +} + export interface DocUpdate { bin: Uint8Array; timestamp: number; @@ -96,6 +104,27 @@ export abstract class DocStorageAdapter extends Connection { return snapshot; } + async getDocDiff( + spaceId: string, + docId: string, + stateVector?: Uint8Array + ): Promise { + const doc = await this.getDoc(spaceId, docId); + + if (!doc) { + return null; + } + + const missing = stateVector ? diffUpdate(doc.bin, stateVector) : doc.bin; + const state = encodeStateVectorFromUpdate(doc.bin); + + return { + missing, + state, + timestamp: doc.timestamp, + }; + } + abstract pushDocUpdates( spaceId: string, docId: string, diff --git a/packages/backend/server/src/core/doc/storage/index.ts b/packages/backend/server/src/core/doc/storage/index.ts index 6ba0e23dd1119..7521fb6cca367 100644 --- a/packages/backend/server/src/core/doc/storage/index.ts +++ b/packages/backend/server/src/core/doc/storage/index.ts @@ -1,4 +1,6 @@ -// TODO(@forehalo): share with frontend +// This is a totally copy of definitions in [@affine/space-store] +// because currently importing cross workspace package from [@affine/server] is not yet supported +// should be kept updated with the original definitions in [@affine/space-store] import type { BlobStorageAdapter } from './blob'; import { Connection } from './connection'; import type { DocStorageAdapter } from './doc'; diff --git a/packages/backend/server/src/core/quota/resolver.ts b/packages/backend/server/src/core/quota/resolver.ts index c8afaa2237a01..309f43833ab6d 100644 --- a/packages/backend/server/src/core/quota/resolver.ts +++ b/packages/backend/server/src/core/quota/resolver.ts @@ -11,6 +11,7 @@ import { CurrentUser } from '../auth/session'; import { EarlyAccessType } from '../features'; import { UserType } from '../user'; import { QuotaService } from './service'; +import { QuotaManagementService } from './storage'; registerEnumType(EarlyAccessType, { name: 'EarlyAccessType', @@ -55,9 +56,18 @@ class UserQuotaType { humanReadable!: UserQuotaHumanReadableType; } +@ObjectType('UserQuotaUsage') +class UserQuotaUsageType { + @Field(() => SafeIntResolver, { name: 'storageQuota' }) + storageQuota!: number; +} + @Resolver(() => UserType) export class QuotaManagementResolver { - constructor(private readonly quota: QuotaService) {} + constructor( + private readonly quota: QuotaService, + private readonly management: QuotaManagementService + ) {} @ResolveField(() => UserQuotaType, { name: 'quota', nullable: true }) async getQuota(@CurrentUser() me: UserType) { @@ -65,4 +75,15 @@ export class QuotaManagementResolver { return quota.feature; } + + @ResolveField(() => UserQuotaUsageType, { name: 'quotaUsage' }) + async getQuotaUsage( + @CurrentUser() me: UserType + ): Promise { + const usage = await this.management.getUserStorageUsage(me.id); + + return { + storageQuota: usage, + }; + } } diff --git a/packages/backend/server/src/core/quota/storage.ts b/packages/backend/server/src/core/quota/storage.ts index ac77ace364f93..4e6910d635c50 100644 --- a/packages/backend/server/src/core/quota/storage.ts +++ b/packages/backend/server/src/core/quota/storage.ts @@ -40,7 +40,7 @@ export class QuotaManagementService { }; } - async getUserUsage(userId: string) { + async getUserStorageUsage(userId: string) { const workspaces = await this.permissions.getOwnedWorkspaces(userId); const sizes = await Promise.allSettled( @@ -88,7 +88,7 @@ export class QuotaManagementService { async getQuotaCalculator(userId: string) { const quota = await this.getUserQuota(userId); const { storageQuota, businessBlobLimit } = quota; - const usedSize = await this.getUserUsage(userId); + const usedSize = await this.getUserStorageUsage(userId); return this.generateQuotaCalculator( storageQuota, @@ -128,7 +128,7 @@ export class QuotaManagementService { }, } = await this.quota.getUserQuota(owner.id); // get all workspaces size of owner used - const usedSize = await this.getUserUsage(owner.id); + const usedSize = await this.getUserStorageUsage(owner.id); // relax restrictions if workspace has unlimited feature // todo(@darkskygit): need a mechanism to allow feature as a middleware to edit quota const unlimited = await this.feature.hasWorkspaceFeature( diff --git a/packages/backend/server/src/core/storage/wrappers/blob.ts b/packages/backend/server/src/core/storage/wrappers/blob.ts index 9aa991ab4b924..b6a80d31fe7cb 100644 --- a/packages/backend/server/src/core/storage/wrappers/blob.ts +++ b/packages/backend/server/src/core/storage/wrappers/blob.ts @@ -1,12 +1,12 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; +import { PrismaClient } from '@prisma/client'; import { - type BlobInputType, - Cache, Config, EventEmitter, type EventPayload, - type ListObjectsMetadata, + type GetObjectMetadata, + ListObjectsMetadata, OnEvent, type StorageProvider, StorageProviderFactory, @@ -14,62 +14,164 @@ import { @Injectable() export class WorkspaceBlobStorage { + private readonly logger = new Logger(WorkspaceBlobStorage.name); public readonly provider: StorageProvider; constructor( private readonly config: Config, private readonly event: EventEmitter, private readonly storageFactory: StorageProviderFactory, - private readonly cache: Cache + private readonly db: PrismaClient ) { this.provider = this.storageFactory.create(this.config.storages.blob); } - async put(workspaceId: string, key: string, blob: BlobInputType) { - await this.provider.put(`${workspaceId}/${key}`, blob); - await this.cache.delete(`blob-list:${workspaceId}`); + async put(workspaceId: string, key: string, blob: Buffer, mime: string) { + const meta: GetObjectMetadata = { + contentType: mime, + contentLength: blob.byteLength, + lastModified: new Date(), + }; + await this.provider.put(`${workspaceId}/${key}`, blob, meta); + this.trySyncBlobMeta(workspaceId, key, meta); } async get(workspaceId: string, key: string) { - return this.provider.get(`${workspaceId}/${key}`); + const blob = await this.provider.get(`${workspaceId}/${key}`); + this.trySyncBlobMeta(workspaceId, key, blob.metadata); + return blob; } async list(workspaceId: string) { - const cachedList = await this.cache.list( - `blob-list:${workspaceId}`, - 0, - -1 - ); - - if (cachedList.length > 0) { - return cachedList; + const blobsInDb = await this.db.blob.findMany({ + where: { + workspaceId, + deletedAt: null, + }, + }); + + if (blobsInDb.length > 0) { + return blobsInDb; } const blobs = await this.provider.list(workspaceId + '/'); + this.trySyncBlobsMeta(workspaceId, blobs); + + return blobs.map(blob => ({ + key: blob.key, + size: blob.contentLength, + createdAt: blob.lastModified, + mime: 'application/octet-stream', + })); + } - blobs.forEach(item => { - // trim workspace prefix - item.key = item.key.slice(workspaceId.length + 1); - }); - - await this.cache.pushBack(`blob-list:${workspaceId}`, ...blobs); - - return blobs; + async delete(workspaceId: string, key: string, permanently = false) { + if (permanently) { + await this.provider.delete(`${workspaceId}/${key}`); + await this.db.blob.deleteMany({ + where: { + workspaceId, + key, + }, + }); + } else { + await this.db.blob.update({ + where: { + workspaceId_key: { + workspaceId, + key, + }, + }, + data: { + deletedAt: new Date(), + }, + }); + } } - /** - * we won't really delete the blobs until the doc blobs manager is implemented sounded - */ - async delete(_workspaceId: string, _key: string) { - // return this.provider.delete(`${workspaceId}/${key}`); + async release(workspaceId: string) { + const deletedBlobs = await this.db.blob.findMany({ + where: { + workspaceId, + deletedAt: { + not: null, + }, + }, + }); + + deletedBlobs.forEach(blob => { + this.event.emit('workspace.blob.deleted', { + workspaceId: workspaceId, + key: blob.key, + }); + }); } async totalSize(workspaceId: string) { const blobs = await this.list(workspaceId); - // how could we ignore the ones get soft-deleted? return blobs.reduce((acc, item) => acc + item.size, 0); } + private trySyncBlobsMeta(workspaceId: string, blobs: ListObjectsMetadata[]) { + for (const blob of blobs) { + this.trySyncBlobMeta(workspaceId, blob.key, { + contentType: 'application/octet-stream', + ...blob, + }); + } + } + + private trySyncBlobMeta( + workspaceId: string, + key: string, + meta?: GetObjectMetadata + ) { + setImmediate(() => { + this.syncBlobMeta(workspaceId, key, meta).catch(() => { + /* never throw */ + }); + }); + } + + private async syncBlobMeta( + workspaceId: string, + key: string, + meta?: GetObjectMetadata + ) { + try { + if (meta) { + await this.db.blob.upsert({ + where: { + workspaceId_key: { + workspaceId, + key, + }, + }, + update: { + mime: meta.contentType, + size: meta.contentLength, + }, + create: { + workspaceId, + key, + mime: meta.contentType, + size: meta.contentLength, + }, + }); + } else { + await this.db.blob.deleteMany({ + where: { + workspaceId, + key, + }, + }); + } + } catch (e) { + // never throw + this.logger.error('failed to sync blob meta to DB', e); + } + } + @OnEvent('workspace.deleted') async onWorkspaceDeleted(workspaceId: EventPayload<'workspace.deleted'>) { const blobs = await this.list(workspaceId); @@ -78,7 +180,7 @@ export class WorkspaceBlobStorage { blobs.forEach(blob => { this.event.emit('workspace.blob.deleted', { workspaceId: workspaceId, - name: blob.key, + key: blob.key, }); }); } @@ -86,8 +188,14 @@ export class WorkspaceBlobStorage { @OnEvent('workspace.blob.deleted') async onDeleteWorkspaceBlob({ workspaceId, - name, + key, }: EventPayload<'workspace.blob.deleted'>) { - await this.delete(workspaceId, name); + await this.db.blob.deleteMany({ + where: { + workspaceId, + key, + }, + }); + await this.delete(workspaceId, key); } } diff --git a/packages/backend/server/src/core/sync/gateway.ts b/packages/backend/server/src/core/sync/gateway.ts index d5686c658fd42..d1c1c5ff7759a 100644 --- a/packages/backend/server/src/core/sync/gateway.ts +++ b/packages/backend/server/src/core/sync/gateway.ts @@ -8,7 +8,6 @@ import { WebSocketGateway, } from '@nestjs/websockets'; import { Socket } from 'socket.io'; -import { diffUpdate, encodeStateVectorFromUpdate } from 'yjs'; import { AlreadyInSpace, @@ -83,6 +82,9 @@ interface LeaveSpaceAwarenessMessage { docId: string; } +/** + * @deprecated + */ interface PushDocUpdatesMessage { spaceType: SpaceType; spaceId: string; @@ -90,6 +92,13 @@ interface PushDocUpdatesMessage { updates: string[]; } +interface PushDocUpdateMessage { + spaceType: SpaceType; + spaceId: string; + docId: string; + update: string; +} + interface LoadDocMessage { spaceType: SpaceType; spaceId: string; @@ -97,6 +106,12 @@ interface LoadDocMessage { stateVector?: string; } +interface DeleteDocMessage { + spaceType: SpaceType; + spaceId: string; + docId: string; +} + interface LoadDocTimestampsMessage { spaceType: SpaceType; spaceId: string; @@ -114,6 +129,7 @@ interface UpdateAwarenessMessage { docId: string; awarenessUpdate: string; } + @WebSocketGateway() export class SpaceSyncGateway implements OnGatewayConnection, OnGatewayDisconnect @@ -182,26 +198,6 @@ export class SpaceSyncGateway } } - async joinWorkspace( - client: Socket, - room: `${string}:${'sync' | 'awareness'}` - ) { - await client.join(room); - } - - async leaveWorkspace( - client: Socket, - room: `${string}:${'sync' | 'awareness'}` - ) { - await client.leave(room); - } - - assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) { - if (!client.rooms.has(room)) { - throw new NotInSpace({ spaceId: room.split(':')[0] }); - } - } - // v3 @SubscribeMessage('space:join') async onJoinSpace( @@ -233,36 +229,42 @@ export class SpaceSyncGateway @MessageBody() { spaceType, spaceId, docId, stateVector }: LoadDocMessage ): Promise< - EventResponse<{ missing: string; state?: string; timestamp: number }> + EventResponse<{ missing: string; state: string; timestamp: number }> > { const adapter = this.selectAdapter(client, spaceType); adapter.assertIn(spaceId); - const doc = await adapter.get(spaceId, docId); + const doc = await adapter.diff( + spaceId, + docId, + stateVector ? Buffer.from(stateVector, 'base64') : undefined + ); if (!doc) { throw new DocNotFound({ spaceId, docId }); } - const missing = Buffer.from( - stateVector - ? diffUpdate(doc.bin, Buffer.from(stateVector, 'base64')) - : doc.bin - ).toString('base64'); - - const state = Buffer.from(encodeStateVectorFromUpdate(doc.bin)).toString( - 'base64' - ); - return { data: { - missing, - state, + missing: Buffer.from(doc.missing).toString('base64'), + state: Buffer.from(doc.state).toString('base64'), timestamp: doc.timestamp, }, }; } + @SubscribeMessage('space:delete-doc') + async onDeleteSpaceDoc( + @ConnectedSocket() client: Socket, + @MessageBody() { spaceType, spaceId, docId }: DeleteDocMessage + ) { + const adapter = this.selectAdapter(client, spaceType); + await adapter.delete(spaceId, docId); + } + + /** + * @deprecated use [space:push-doc-update] instead, client should always merge updates on their own + */ @SubscribeMessage('space:push-doc-updates') async onReceiveDocUpdates( @ConnectedSocket() client: Socket, @@ -307,6 +309,51 @@ export class SpaceSyncGateway }; } + @SubscribeMessage('space:push-doc-update') + async onReceiveDocUpdate( + @ConnectedSocket() client: Socket, + @CurrentUser() user: CurrentUser, + @MessageBody() + message: PushDocUpdateMessage + ): Promise> { + const { spaceType, spaceId, docId, update } = message; + const adapter = this.selectAdapter(client, spaceType); + + // TODO(@forehalo): we might need to check write permission before push updates + const timestamp = await adapter.push( + spaceId, + docId, + [Buffer.from(update, 'base64')], + user.id + ); + + // TODO(@forehalo): separate different version of clients into different rooms, + // so the clients won't receive useless updates events + client.to(adapter.room(spaceId)).emit('space:broadcast-doc-updates', { + spaceType, + spaceId, + docId, + updates: [update], + timestamp, + }); + + client.to(adapter.room(spaceId)).emit('space:broadcast-doc-update', { + spaceType, + spaceId, + docId, + update, + timestamp, + editor: user.id, + }); + + return { + data: { + accepted: true, + timestamp, + }, + }; + } + @SubscribeMessage('space:load-doc-timestamps') async onLoadDocTimestamps( @ConnectedSocket() client: Socket, @@ -600,9 +647,14 @@ abstract class SyncSocketAdapter { return this.storage.pushDocUpdates(spaceId, docId, updates, editorId); } - get(spaceId: string, docId: string) { + diff(spaceId: string, docId: string, stateVector?: Uint8Array) { + this.assertIn(spaceId); + return this.storage.getDocDiff(spaceId, docId, stateVector); + } + + delete(spaceId: string, docId: string) { this.assertIn(spaceId); - return this.storage.getDoc(spaceId, docId); + return this.storage.deleteDoc(spaceId, docId); } getTimestamps(spaceId: string, timestamp?: number) { @@ -630,9 +682,9 @@ class WorkspaceSyncAdapter extends SyncSocketAdapter { return super.push(spaceId, id.guid, updates, editorId); } - override get(spaceId: string, docId: string) { + override diff(spaceId: string, docId: string, stateVector?: Uint8Array) { const id = new DocID(docId, spaceId); - return this.storage.getDoc(spaceId, id.guid); + return this.storage.getDocDiff(spaceId, id.guid, stateVector); } async assertAccessible( diff --git a/packages/backend/server/src/core/workspaces/resolvers/blob.ts b/packages/backend/server/src/core/workspaces/resolvers/blob.ts index 82c8f6c1ca986..7edef0f431b6e 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/blob.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/blob.ts @@ -1,29 +1,40 @@ import { Logger, UseGuards } from '@nestjs/common'; import { Args, + Field, Int, Mutation, + ObjectType, Parent, Query, ResolveField, Resolver, } from '@nestjs/graphql'; -import { SafeIntResolver } from 'graphql-scalars'; import GraphQLUpload from 'graphql-upload/GraphQLUpload.mjs'; import type { FileUpload } from '../../../fundamentals'; -import { - BlobQuotaExceeded, - CloudThrottlerGuard, - MakeCache, - PreventCache, -} from '../../../fundamentals'; +import { BlobQuotaExceeded, CloudThrottlerGuard } from '../../../fundamentals'; import { CurrentUser } from '../../auth'; import { Permission, PermissionService } from '../../permission'; import { QuotaManagementService } from '../../quota'; import { WorkspaceBlobStorage } from '../../storage'; import { WorkspaceBlobSizes, WorkspaceType } from '../types'; +@ObjectType() +class ListedBlob { + @Field() + key!: string; + + @Field() + mime!: string; + + @Field() + size!: number; + + @Field() + createdAt!: string; +} + @UseGuards(CloudThrottlerGuard) @Resolver(() => WorkspaceType) export class WorkspaceBlobResolver { @@ -34,7 +45,7 @@ export class WorkspaceBlobResolver { private readonly storage: WorkspaceBlobStorage ) {} - @ResolveField(() => [String], { + @ResolveField(() => [ListedBlob], { description: 'List blobs of workspace', complexity: 2, }) @@ -44,9 +55,7 @@ export class WorkspaceBlobResolver { ) { await this.permissions.checkWorkspace(workspace.id, user.id); - return this.storage - .list(workspace.id) - .then(list => list.map(item => item.key)); + return this.storage.list(workspace.id); } @ResolveField(() => Int, { @@ -64,7 +73,6 @@ export class WorkspaceBlobResolver { description: 'List blobs of workspace', deprecationReason: 'use `workspace.blobs` instead', }) - @MakeCache(['blobs'], ['workspaceId']) async listBlobs( @CurrentUser() user: CurrentUser, @Args('workspaceId') workspaceId: string @@ -76,42 +84,15 @@ export class WorkspaceBlobResolver { .then(list => list.map(item => item.key)); } - /** - * @deprecated use `user.storageUsage` instead - */ @Query(() => WorkspaceBlobSizes, { - deprecationReason: 'use `user.storageUsage` instead', + deprecationReason: 'use `user.quotaUsage` instead', }) async collectAllBlobSizes(@CurrentUser() user: CurrentUser) { - const size = await this.quota.getUserUsage(user.id); + const size = await this.quota.getUserStorageUsage(user.id); return { size }; } - /** - * @deprecated mutation `setBlob` will check blob limit & quota usage - */ - @Query(() => WorkspaceBlobSizes, { - deprecationReason: 'no more needed', - }) - async checkBlobSize( - @CurrentUser() user: CurrentUser, - @Args('workspaceId') workspaceId: string, - @Args('size', { type: () => SafeIntResolver }) blobSize: number - ) { - const canWrite = await this.permissions.tryCheckWorkspace( - workspaceId, - user.id, - Permission.Write - ); - if (canWrite) { - const size = await this.quota.checkBlobQuota(workspaceId, blobSize); - return { size }; - } - return false; - } - @Mutation(() => String) - @PreventCache(['blobs'], ['workspaceId']) async setBlob( @CurrentUser() user: CurrentUser, @Args('workspaceId') workspaceId: string, @@ -155,20 +136,44 @@ export class WorkspaceBlobResolver { }); }); - await this.storage.put(workspaceId, blob.filename, buffer); + await this.storage.put(workspaceId, blob.filename, buffer, blob.mimetype); return blob.filename; } @Mutation(() => Boolean) - @PreventCache(['blobs'], ['workspaceId']) async deleteBlob( @CurrentUser() user: CurrentUser, @Args('workspaceId') workspaceId: string, - @Args('hash') name: string + @Args('hash', { + type: () => String, + deprecationReason: 'use parameter [key]', + nullable: true, + }) + hash?: string, + @Args('key', { type: () => String, nullable: true }) key?: string, + @Args('permanently', { type: () => Boolean, defaultValue: false }) + permanently = false + ) { + key = key ?? hash; + if (!key) { + return false; + } + + await this.permissions.checkWorkspace(workspaceId, user.id); + + await this.storage.delete(workspaceId, key, permanently); + + return true; + } + + @Mutation(() => Boolean) + async releaseDeletedBlobs( + @CurrentUser() user: CurrentUser, + @Args('workspaceId') workspaceId: string ) { await this.permissions.checkWorkspace(workspaceId, user.id); - await this.storage.delete(workspaceId, name); + await this.storage.release(workspaceId); return true; } diff --git a/packages/backend/server/src/fundamentals/event/def.ts b/packages/backend/server/src/fundamentals/event/def.ts index 33db14b7a2832..c9cb42381350a 100644 --- a/packages/backend/server/src/fundamentals/event/def.ts +++ b/packages/backend/server/src/fundamentals/event/def.ts @@ -7,7 +7,7 @@ export interface WorkspaceEvents { blob: { deleted: Payload<{ workspaceId: Workspace['id']; - name: string; + key: string; }>; }; } diff --git a/packages/backend/server/src/fundamentals/storage/providers/fs.ts b/packages/backend/server/src/fundamentals/storage/providers/fs.ts index 282396c98d0f0..c271d93ece789 100644 --- a/packages/backend/server/src/fundamentals/storage/providers/fs.ts +++ b/packages/backend/server/src/fundamentals/storage/providers/fs.ts @@ -119,7 +119,7 @@ export class FsStorageProvider implements StorageProvider { results.push({ key: res, lastModified: stat.mtime, - size: stat.size, + contentLength: stat.size, }); } } diff --git a/packages/backend/server/src/fundamentals/storage/providers/provider.ts b/packages/backend/server/src/fundamentals/storage/providers/provider.ts index 46f8ef688db35..5b2bad24b2473 100644 --- a/packages/backend/server/src/fundamentals/storage/providers/provider.ts +++ b/packages/backend/server/src/fundamentals/storage/providers/provider.ts @@ -21,7 +21,7 @@ export interface PutObjectMetadata { export interface ListObjectsMetadata { key: string; lastModified: Date; - size: number; + contentLength: number; } export type BlobInputType = Buffer | Readable | string; diff --git a/packages/backend/server/src/fundamentals/storage/providers/utils.ts b/packages/backend/server/src/fundamentals/storage/providers/utils.ts index a0eab7d8c5451..1f9fc52ed6e6e 100644 --- a/packages/backend/server/src/fundamentals/storage/providers/utils.ts +++ b/packages/backend/server/src/fundamentals/storage/providers/utils.ts @@ -24,7 +24,7 @@ export async function autoMetadata( try { // length if (!metadata.contentLength) { - metadata.contentLength = blob.length; + metadata.contentLength = blob.byteLength; } // checksum diff --git a/packages/backend/server/src/plugins/storage/providers/s3.ts b/packages/backend/server/src/plugins/storage/providers/s3.ts index 3a242704dbf28..3aabf11f747d6 100644 --- a/packages/backend/server/src/plugins/storage/providers/s3.ts +++ b/packages/backend/server/src/plugins/storage/providers/s3.ts @@ -140,7 +140,7 @@ export class S3StorageProvider implements StorageProvider { listResult.Contents.map(r => ({ key: r.Key!, lastModified: r.LastModified!, - size: r.Size!, + contentLength: r.Size!, })) ); } diff --git a/packages/backend/server/src/schema.gql b/packages/backend/server/src/schema.gql index 5d129eb89e1b4..58624ce58991e 100644 --- a/packages/backend/server/src/schema.gql +++ b/packages/backend/server/src/schema.gql @@ -411,6 +411,13 @@ input ListUserInput { skip: Int = 0 } +type ListedBlob { + createdAt: String! + key: String! + mime: String! + size: Int! +} + input ManageUserInput { """User email""" email: String @@ -457,7 +464,7 @@ type Mutation { """Create a new workspace""" createWorkspace(init: Upload): WorkspaceType! deleteAccount: DeleteAccount! - deleteBlob(hash: String!, workspaceId: String!): Boolean! + deleteBlob(hash: String @deprecated(reason: "use parameter [key]"), key: String, permanently: Boolean! = false, workspaceId: String!): Boolean! """Delete a user account""" deleteUser(id: String!): DeleteAccount! @@ -469,6 +476,7 @@ type Mutation { leaveWorkspace(sendLeaveMail: Boolean, workspaceId: String!, workspaceName: String!): Boolean! publishPage(mode: PublicPageMode = Page, pageId: String!, workspaceId: String!): WorkspacePage! recoverDoc(guid: String!, timestamp: DateTime!, workspaceId: String!): DateTime! + releaseDeletedBlobs(workspaceId: String!): Boolean! """Remove user avatar""" removeAvatar: RemoveAvatar! @@ -541,8 +549,7 @@ enum PublicPageMode { } type Query { - checkBlobSize(size: SafeInt!, workspaceId: String!): WorkspaceBlobSizes! @deprecated(reason: "no more needed") - collectAllBlobSizes: WorkspaceBlobSizes! @deprecated(reason: "use `user.storageUsage` instead") + collectAllBlobSizes: WorkspaceBlobSizes! @deprecated(reason: "use `user.quotaUsage` instead") """Get current user""" currentUser: UserType @@ -825,6 +832,10 @@ type UserQuotaHumanReadable { storageQuota: String! } +type UserQuotaUsage { + storageQuota: SafeInt! +} + type UserSubscription { canceledAt: DateTime createdAt: DateTime! @@ -874,6 +885,7 @@ type UserType { """User name""" name: String! quota: UserQuota + quotaUsage: UserQuotaUsage! subscription(plan: SubscriptionPlan = Pro): UserSubscription @deprecated(reason: "use `UserType.subscriptions`") subscriptions: [UserSubscription!]! token: tokenType! @deprecated(reason: "use [/api/auth/sign-in?native=true] instead") @@ -907,7 +919,7 @@ type WorkspaceType { availableFeatures: [FeatureType!]! """List blobs of workspace""" - blobs: [String!]! + blobs: [ListedBlob!]! """Blobs size of workspace""" blobsSize: Int! diff --git a/packages/backend/server/tests/utils/blobs.ts b/packages/backend/server/tests/utils/blobs.ts index 514fbeee468d1..f6832c41c401d 100644 --- a/packages/backend/server/tests/utils/blobs.ts +++ b/packages/backend/server/tests/utils/blobs.ts @@ -54,35 +54,16 @@ export async function collectAllBlobSizes( .send({ query: ` query { - collectAllBlobSizes { - size + currentUser { + quotaUsage { + storageQuota + } } } `, }) .expect(200); - return res.body.data.collectAllBlobSizes.size; -} - -export async function checkBlobSize( - app: INestApplication, - token: string, - workspaceId: string, - size: number -): Promise { - const res = await request(app.getHttpServer()) - .post(gql) - .auth(token, { type: 'bearer' }) - .send({ - query: `query checkBlobSize($workspaceId: String!, $size: SafeInt!) { - checkBlobSize(workspaceId: $workspaceId, size: $size) { - size - } - }`, - variables: { workspaceId, size }, - }) - .expect(200); - return res.body.data.checkBlobSize.size; + return res.body.data.currentUser.quotaUsage.storageQuota; } export async function setBlob( diff --git a/packages/backend/server/tests/workspace-blobs.spec.ts b/packages/backend/server/tests/workspace/blobs.e2e.ts similarity index 76% rename from packages/backend/server/tests/workspace-blobs.spec.ts rename to packages/backend/server/tests/workspace/blobs.e2e.ts index 3e81d5fbf7aff..08dc6947fce4d 100644 --- a/packages/backend/server/tests/workspace-blobs.spec.ts +++ b/packages/backend/server/tests/workspace/blobs.e2e.ts @@ -2,11 +2,10 @@ import type { INestApplication } from '@nestjs/common'; import test from 'ava'; import request from 'supertest'; -import { AppModule } from '../src/app.module'; -import { FeatureManagementService, FeatureType } from '../src/core/features'; -import { QuotaService, QuotaType } from '../src/core/quota'; +import { AppModule } from '../../src/app.module'; +import { FeatureManagementService, FeatureType } from '../../src/core/features'; +import { QuotaService, QuotaType } from '../../src/core/quota'; import { - checkBlobSize, collectAllBlobSizes, createTestingApp, createWorkspace, @@ -14,7 +13,7 @@ import { listBlobs, setBlob, signUp, -} from './utils'; +} from '../utils'; const OneMB = 1024 * 1024; @@ -114,58 +113,6 @@ test('should calc all blobs size', async t => { const size = await collectAllBlobSizes(app, u1.token.token); t.is(size, 8, 'failed to collect all blob sizes'); - - const size1 = await checkBlobSize( - app, - u1.token.token, - workspace1.id, - 10 * 1024 * 1024 * 1024 - 8 - ); - t.is(size1, 0, 'failed to check blob size'); - - const size2 = await checkBlobSize( - app, - u1.token.token, - workspace1.id, - 10 * 1024 * 1024 * 1024 - 7 - ); - t.is(size2, -1, 'failed to check blob size'); -}); - -test('should be able calc quota after switch plan', async t => { - const u1 = await signUp(app, 'darksky', 'darksky@affine.pro', '1'); - - const workspace1 = await createWorkspace(app, u1.token.token); - - const buffer1 = Buffer.from([0, 0]); - await setBlob(app, u1.token.token, workspace1.id, buffer1); - const buffer2 = Buffer.from([0, 1]); - await setBlob(app, u1.token.token, workspace1.id, buffer2); - - const workspace2 = await createWorkspace(app, u1.token.token); - - const buffer3 = Buffer.from([0, 0]); - await setBlob(app, u1.token.token, workspace2.id, buffer3); - const buffer4 = Buffer.from([0, 1]); - await setBlob(app, u1.token.token, workspace2.id, buffer4); - - const size1 = await checkBlobSize( - app, - u1.token.token, - workspace1.id, - 10 * 1024 * 1024 * 1024 - 8 - ); - t.is(size1, 0, 'failed to check free plan blob size'); - - await quota.switchUserQuota(u1.id, QuotaType.ProPlanV1); - - const size2 = await checkBlobSize( - app, - u1.token.token, - workspace1.id, - 100 * 1024 * 1024 * 1024 - 8 - ); - t.is(size2, 0, 'failed to check pro plan blob size'); }); test('should reject blob exceeded limit', async t => { diff --git a/packages/common/nbstore/package.json b/packages/common/nbstore/package.json index 5fc2f1083e002..dff744be4ba23 100644 --- a/packages/common/nbstore/package.json +++ b/packages/common/nbstore/package.json @@ -8,7 +8,8 @@ ".": "./src/index.ts", "./op": "./src/op/index.ts", "./idb": "./src/impls/idb/index.ts", - "./idb/v1": "./src/impls/idb/v1/index.ts" + "./idb/v1": "./src/impls/idb/v1/index.ts", + "./cloud": "./src/impls/cloud/index.ts" }, "dependencies": { "@toeverything/infra": "workspace:*", @@ -18,9 +19,13 @@ "yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" }, "devDependencies": { - "idb": "^8.0.0" + "@affine/graphql": "workspace:*", + "idb": "^8.0.0", + "socket.io-client": "^4.7.5" }, "peerDependencies": { - "idb": "^8.0.0" + "@affine/graphql": "workspace:*", + "idb": "^8.0.0", + "socket.io-client": "^4.7.5" } } diff --git a/packages/common/nbstore/src/impls/cloud/blob.ts b/packages/common/nbstore/src/impls/cloud/blob.ts new file mode 100644 index 0000000000000..671687d75ced1 --- /dev/null +++ b/packages/common/nbstore/src/impls/cloud/blob.ts @@ -0,0 +1,72 @@ +import { + deleteBlobMutation, + gqlFetcherFactory, + listBlobsQuery, + releaseDeletedBlobsMutation, + setBlobMutation, +} from '@affine/graphql'; + +import { DummyConnection } from '../../connection'; +import { type BlobRecord, BlobStorage } from '../../storage'; + +export class CloudBlobStorage extends BlobStorage { + private readonly gql = gqlFetcherFactory(this.options.peer + '/graphql'); + override connection = new DummyConnection(); + + override async get(key: string) { + const res = await fetch( + this.options.peer + '/api/workspaces/' + this.spaceId + '/blobs/' + key, + { cache: 'default' } + ); + + if (!res.ok) { + return null; + } + + const data = await res.arrayBuffer(); + + return { + key, + data: new Uint8Array(data), + mime: res.headers.get('content-type') || '', + size: data.byteLength, + createdAt: new Date(res.headers.get('last-modified') || Date.now()), + }; + } + + override async set(blob: BlobRecord) { + await this.gql({ + query: setBlobMutation, + variables: { + workspaceId: this.spaceId, + blob: new File([blob.data], blob.key, { type: blob.mime }), + }, + }); + } + + override async delete(key: string, permanently: boolean) { + await this.gql({ + query: deleteBlobMutation, + variables: { workspaceId: this.spaceId, key, permanently }, + }); + } + + override async release() { + await this.gql({ + query: releaseDeletedBlobsMutation, + variables: { workspaceId: this.spaceId }, + }); + } + + override async list() { + const res = await this.gql({ + query: listBlobsQuery, + variables: { workspaceId: this.spaceId }, + }); + + return res.workspace.blobs.map(blob => ({ + ...blob, + createdAt: new Date(blob.createdAt), + })); + } +} diff --git a/packages/common/nbstore/src/impls/cloud/doc.ts b/packages/common/nbstore/src/impls/cloud/doc.ts new file mode 100644 index 0000000000000..b9f28c659b58c --- /dev/null +++ b/packages/common/nbstore/src/impls/cloud/doc.ts @@ -0,0 +1,177 @@ +import { noop } from 'lodash-es'; +import type { SocketOptions } from 'socket.io-client'; + +import { share } from '../../connection'; +import { + type DocClocks, + DocStorage, + type DocStorageOptions, + type DocUpdate, +} from '../../storage'; +import { + base64ToUint8Array, + type ServerEventsMap, + SocketConnection, + uint8ArrayToBase64, +} from './socket'; + +interface CloudDocStorageOptions extends DocStorageOptions { + socketOptions: SocketOptions; +} + +export class CloudDocStorage extends DocStorage { + connection = share( + new SocketConnection(this.peer, this.options.socketOptions) + ); + + private get socket() { + return this.connection.inner; + } + + override async connect(): Promise { + await super.connect(); + this.connection.onStatusChanged(status => { + if (status === 'connected') { + this.join().catch(noop); + this.socket.on('space:broadcast-doc-update', this.onServerUpdate); + } + }); + } + + override async disconnect(): Promise { + this.socket.emit('space:leave', { + spaceType: this.spaceType, + spaceId: this.spaceId, + }); + this.socket.off('space:broadcast-doc-update', this.onServerUpdate); + await super.connect(); + } + + async join() { + try { + const res = await this.socket.emitWithAck('space:join', { + spaceType: this.spaceType, + spaceId: this.spaceId, + clientVersion: BUILD_CONFIG.appVersion, + }); + + if ('error' in res) { + this.connection.setStatus('closed', new Error(res.error.message)); + } + } catch (e) { + this.connection.setStatus('error', e as Error); + } + } + + onServerUpdate: ServerEventsMap['space:broadcast-doc-update'] = message => { + if ( + this.spaceType === message.spaceType && + this.spaceId === message.spaceId + ) { + this.emit('update', { + docId: message.docId, + bin: base64ToUint8Array(message.update), + timestamp: new Date(message.timestamp), + editor: message.editor, + }); + } + }; + + override async getDocSnapshot(docId: string) { + const response = await this.socket.emitWithAck('space:load-doc', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId, + }); + + if ('error' in response) { + // TODO: use [UserFriendlyError] + throw new Error(response.error.message); + } + + return { + docId, + bin: base64ToUint8Array(response.data.missing), + timestamp: new Date(response.data.timestamp), + }; + } + + override async getDocDiff(docId: string, state?: Uint8Array) { + const response = await this.socket.emitWithAck('space:load-doc', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId, + stateVector: state ? await uint8ArrayToBase64(state) : void 0, + }); + + if ('error' in response) { + // TODO: use [UserFriendlyError] + throw new Error(response.error.message); + } + + return { + docId, + missing: base64ToUint8Array(response.data.missing), + state: base64ToUint8Array(response.data.state), + timestamp: new Date(response.data.timestamp), + }; + } + + override async pushDocUpdate(update: DocUpdate) { + const response = await this.socket.emitWithAck('space:push-doc-update', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId: update.docId, + updates: await uint8ArrayToBase64(update.bin), + }); + + if ('error' in response) { + // TODO(@forehalo): use [UserFriendlyError] + throw new Error(response.error.message); + } + + return { + docId: update.docId, + timestamp: new Date(response.data.timestamp), + }; + } + + override async getDocTimestamps(after?: Date) { + const response = await this.socket.emitWithAck( + 'space:load-doc-timestamps', + { + spaceType: this.spaceType, + spaceId: this.spaceId, + timestamp: after ? after.getTime() : undefined, + } + ); + + if ('error' in response) { + // TODO(@forehalo): use [UserFriendlyError] + throw new Error(response.error.message); + } + + return Object.entries(response.data).reduce((ret, [docId, timestamp]) => { + ret[docId] = new Date(timestamp); + return ret; + }, {} as DocClocks); + } + + override async deleteDoc(docId: string) { + this.socket.emit('space:delete-doc', { + spaceType: this.spaceType, + spaceId: this.spaceId, + docId, + }); + } + + protected async setDocSnapshot() { + return false; + } + protected async getDocUpdates() { + return []; + } + protected async markUpdatesMerged() { + return 0; + } +} diff --git a/packages/common/nbstore/src/impls/cloud/index.ts b/packages/common/nbstore/src/impls/cloud/index.ts new file mode 100644 index 0000000000000..d476ae6eb9b92 --- /dev/null +++ b/packages/common/nbstore/src/impls/cloud/index.ts @@ -0,0 +1,2 @@ +export * from './blob'; +export * from './doc'; diff --git a/packages/common/nbstore/src/impls/cloud/socket.ts b/packages/common/nbstore/src/impls/cloud/socket.ts new file mode 100644 index 0000000000000..6e84a433e372a --- /dev/null +++ b/packages/common/nbstore/src/impls/cloud/socket.ts @@ -0,0 +1,173 @@ +import { + Manager as SocketIOManager, + type Socket as SocketIO, + type SocketOptions, +} from 'socket.io-client'; + +import { Connection, type ConnectionStatus } from '../../connection'; + +// TODO(@forehalo): use [UserFriendlyError] +interface EventError { + name: string; + message: string; +} + +type WebsocketResponse = + | { + error: EventError; + } + | { + data: T; + }; + +interface ServerEvents { + 'space:broadcast-doc-update': { + spaceType: string; + spaceId: string; + docId: string; + update: string; + timestamp: number; + editor: string; + }; +} + +interface ClientEvents { + 'space:join': [ + { spaceType: string; spaceId: string; clientVersion: string }, + { clientId: string }, + ]; + 'space:leave': { spaceType: string; spaceId: string }; + 'space:join-awareness': [ + { + spaceType: string; + spaceId: string; + docId: string; + clientVersion: string; + }, + { clientId: string }, + ]; + 'space:leave-awareness': { + spaceType: string; + spaceId: string; + docId: string; + }; + + 'space:push-doc-update': [ + { spaceType: string; spaceId: string; docId: string; updates: string }, + { timestamp: number }, + ]; + 'space:load-doc-timestamps': [ + { + spaceType: string; + spaceId: string; + timestamp?: number; + }, + Record, + ]; + 'space:load-doc': [ + { + spaceType: string; + spaceId: string; + docId: string; + stateVector?: string; + }, + { + missing: string; + state: string; + timestamp: number; + }, + ]; + 'space:delete-doc': { spaceType: string; spaceId: string; docId: string }; +} + +export type ServerEventsMap = { + [Key in keyof ServerEvents]: (data: ServerEvents[Key]) => void; +}; + +export type ClientEventsMap = { + [Key in keyof ClientEvents]: ClientEvents[Key] extends Array + ? ( + data: ClientEvents[Key][0], + ack: (res: WebsocketResponse) => void + ) => void + : (data: ClientEvents[Key]) => void; +}; + +export type Socket = SocketIO; + +export function uint8ArrayToBase64(array: Uint8Array): Promise { + return new Promise(resolve => { + // Create a blob from the Uint8Array + const blob = new Blob([array]); + + const reader = new FileReader(); + reader.onload = function () { + const dataUrl = reader.result as string | null; + if (!dataUrl) { + resolve(''); + return; + } + // The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data + const base64 = dataUrl.split(',')[1]; + resolve(base64); + }; + + reader.readAsDataURL(blob); + }); +} + +export function base64ToUint8Array(base64: string) { + const binaryString = atob(base64); + const binaryArray = binaryString.split('').map(function (char) { + return char.charCodeAt(0); + }); + return new Uint8Array(binaryArray); +} + +export class SocketConnection extends Connection { + manager = new SocketIOManager(this.endpoint, { + autoConnect: false, + transports: ['websocket'], + secure: new URL(this.endpoint).protocol === 'https:', + }); + + constructor( + private readonly endpoint: string, + private readonly socketOptions: SocketOptions + ) { + super(); + } + + override get shareId() { + return `socket:${this.endpoint}`; + } + + override async doConnect() { + const conn = this.manager.socket('/', this.socketOptions); + + await new Promise((resolve, reject) => { + conn.once('connect', () => { + resolve(); + }); + conn.once('connect_error', err => { + reject(err); + }); + conn.open(); + }); + + return conn; + } + + override async doDisconnect(conn: Socket) { + conn.close(); + } + + /** + * Socket connection allow explicitly set status by user + * + * used when join space failed + */ + override setStatus(status: ConnectionStatus, error?: Error) { + super.setStatus(status, error); + } +} diff --git a/packages/common/nbstore/src/impls/index.ts b/packages/common/nbstore/src/impls/index.ts index 8bce204798b51..bff2874ad4467 100644 --- a/packages/common/nbstore/src/impls/index.ts +++ b/packages/common/nbstore/src/impls/index.ts @@ -1,4 +1,5 @@ import type { Storage } from '../storage'; +import { CloudBlobStorage, CloudDocStorage } from './cloud'; import { IndexedDBBlobStorage, IndexedDBDocStorage, @@ -19,7 +20,9 @@ const idbv1: StorageConstructor[] = [ IndexedDBV1BlobStorage, ]; -export const storages: StorageConstructor[] = [...idbv1, ...idb]; +const cloud: StorageConstructor[] = [CloudDocStorage, CloudBlobStorage]; + +export const storages: StorageConstructor[] = cloud.concat(idbv1, idb); const AvailableStorageImplementations = storages.reduce( (acc, curr) => { diff --git a/packages/frontend/core/src/modules/cloud/stores/user-quota.ts b/packages/frontend/core/src/modules/cloud/stores/user-quota.ts index 0413d46dcb8b5..70698060aec26 100644 --- a/packages/frontend/core/src/modules/cloud/stores/user-quota.ts +++ b/packages/frontend/core/src/modules/cloud/stores/user-quota.ts @@ -23,7 +23,7 @@ export class UserQuotaStore extends Store { return { userId: data.currentUser.id, quota: data.currentUser.quota, - used: data.collectAllBlobSizes.size, + used: data.currentUser.quotaUsage.storageQuota, }; } } diff --git a/packages/frontend/core/src/modules/workspace-engine/impls/engine/blob-cloud.ts b/packages/frontend/core/src/modules/workspace-engine/impls/engine/blob-cloud.ts index 3eacf5dbfb691..e6549b0e05e8f 100644 --- a/packages/frontend/core/src/modules/workspace-engine/impls/engine/blob-cloud.ts +++ b/packages/frontend/core/src/modules/workspace-engine/impls/engine/blob-cloud.ts @@ -70,7 +70,7 @@ export class CloudBlobStorage implements BlobStorage { query: deleteBlobMutation, variables: { workspaceId: key, - hash: key, + key, }, }); } @@ -82,6 +82,6 @@ export class CloudBlobStorage implements BlobStorage { workspaceId: this.workspaceId, }, }); - return result.listBlobs; + return result.workspace.blobs.map(blob => blob.key); } } diff --git a/packages/frontend/graphql/src/graphql/blob-delete.gql b/packages/frontend/graphql/src/graphql/blob-delete.gql index 790c326465909..ab78a54d88c6a 100644 --- a/packages/frontend/graphql/src/graphql/blob-delete.gql +++ b/packages/frontend/graphql/src/graphql/blob-delete.gql @@ -1,3 +1,7 @@ -mutation deleteBlob($workspaceId: String!, $hash: String!) { - deleteBlob(workspaceId: $workspaceId, hash: $hash) +mutation deleteBlob( + $workspaceId: String! + $key: String! + $permanently: Boolean +) { + deleteBlob(workspaceId: $workspaceId, key: $key, permanently: $permanently) } diff --git a/packages/frontend/graphql/src/graphql/blob-list.gql b/packages/frontend/graphql/src/graphql/blob-list.gql index 67a19c718c1bb..5b314f0cc5cfa 100644 --- a/packages/frontend/graphql/src/graphql/blob-list.gql +++ b/packages/frontend/graphql/src/graphql/blob-list.gql @@ -1,3 +1,10 @@ query listBlobs($workspaceId: String!) { - listBlobs(workspaceId: $workspaceId) + workspace(id: $workspaceId) { + blobs { + key + size + mime + createdAt + } + } } diff --git a/packages/frontend/graphql/src/graphql/blob-release-deleted.gql b/packages/frontend/graphql/src/graphql/blob-release-deleted.gql new file mode 100644 index 0000000000000..d51d0ef7ca68d --- /dev/null +++ b/packages/frontend/graphql/src/graphql/blob-release-deleted.gql @@ -0,0 +1,3 @@ +mutation releaseDeletedBlobs($workspaceId: String!) { + releaseDeletedBlobs(workspaceId: $workspaceId) +} diff --git a/packages/frontend/graphql/src/graphql/index.ts b/packages/frontend/graphql/src/graphql/index.ts index 2bc168d6cf542..e94ec76fdf98e 100644 --- a/packages/frontend/graphql/src/graphql/index.ts +++ b/packages/frontend/graphql/src/graphql/index.ts @@ -47,19 +47,37 @@ export const deleteBlobMutation = { definitionName: 'deleteBlob', containsFile: false, query: ` -mutation deleteBlob($workspaceId: String!, $hash: String!) { - deleteBlob(workspaceId: $workspaceId, hash: $hash) +mutation deleteBlob($workspaceId: String!, $key: String!, $permanently: Boolean) { + deleteBlob(workspaceId: $workspaceId, key: $key, permanently: $permanently) }`, }; export const listBlobsQuery = { id: 'listBlobsQuery' as const, operationName: 'listBlobs', - definitionName: 'listBlobs', + definitionName: 'workspace', containsFile: false, query: ` query listBlobs($workspaceId: String!) { - listBlobs(workspaceId: $workspaceId) + workspace(id: $workspaceId) { + blobs { + key + size + mime + createdAt + } + } +}`, +}; + +export const releaseDeletedBlobsMutation = { + id: 'releaseDeletedBlobsMutation' as const, + operationName: 'releaseDeletedBlobs', + definitionName: 'releaseDeletedBlobs', + containsFile: false, + query: ` +mutation releaseDeletedBlobs($workspaceId: String!) { + releaseDeletedBlobs(workspaceId: $workspaceId) }`, }; @@ -836,7 +854,7 @@ mutation publishPage($workspaceId: String!, $pageId: String!, $mode: PublicPageM export const quotaQuery = { id: 'quotaQuery' as const, operationName: 'quota', - definitionName: 'currentUser,collectAllBlobSizes', + definitionName: 'currentUser', containsFile: false, query: ` query quota { @@ -856,9 +874,9 @@ query quota { memberLimit } } - } - collectAllBlobSizes { - size + quotaUsage { + storageQuota + } } }`, }; diff --git a/packages/frontend/graphql/src/graphql/quota.gql b/packages/frontend/graphql/src/graphql/quota.gql index c0268b6443584..682c2e5bc3550 100644 --- a/packages/frontend/graphql/src/graphql/quota.gql +++ b/packages/frontend/graphql/src/graphql/quota.gql @@ -15,8 +15,8 @@ query quota { memberLimit } } - } - collectAllBlobSizes { - size + quotaUsage { + storageQuota + } } } diff --git a/packages/frontend/graphql/src/schema.ts b/packages/frontend/graphql/src/schema.ts index 039a08c63f1b8..3f0a42a862b3c 100644 --- a/packages/frontend/graphql/src/schema.ts +++ b/packages/frontend/graphql/src/schema.ts @@ -471,6 +471,14 @@ export interface ListUserInput { skip: InputMaybe; } +export interface ListedBlob { + __typename?: 'ListedBlob'; + createdAt: Scalars['String']['output']; + key: Scalars['String']['output']; + mime: Scalars['String']['output']; + size: Scalars['Int']['output']; +} + export interface ManageUserInput { /** User email */ email: InputMaybe; @@ -519,6 +527,7 @@ export interface Mutation { leaveWorkspace: Scalars['Boolean']['output']; publishPage: WorkspacePage; recoverDoc: Scalars['DateTime']['output']; + releaseDeletedBlobs: Scalars['Boolean']['output']; /** Remove user avatar */ removeAvatar: RemoveAvatar; removeWorkspaceFeature: Scalars['Int']['output']; @@ -616,7 +625,9 @@ export interface MutationCreateWorkspaceArgs { } export interface MutationDeleteBlobArgs { - hash: Scalars['String']['input']; + hash: InputMaybe; + key: InputMaybe; + permanently?: Scalars['Boolean']['input']; workspaceId: Scalars['String']['input']; } @@ -657,6 +668,10 @@ export interface MutationRecoverDocArgs { workspaceId: Scalars['String']['input']; } +export interface MutationReleaseDeletedBlobsArgs { + workspaceId: Scalars['String']['input']; +} + export interface MutationRemoveWorkspaceFeatureArgs { feature: FeatureType; workspaceId: Scalars['String']['input']; @@ -802,9 +817,7 @@ export enum PublicPageMode { export interface Query { __typename?: 'Query'; - /** @deprecated no more needed */ - checkBlobSize: WorkspaceBlobSizes; - /** @deprecated use `user.storageUsage` instead */ + /** @deprecated use `user.quotaUsage` instead */ collectAllBlobSizes: WorkspaceBlobSizes; /** Get current user */ currentUser: Maybe; @@ -843,11 +856,6 @@ export interface Query { workspaces: Array; } -export interface QueryCheckBlobSizeArgs { - size: Scalars['SafeInt']['input']; - workspaceId: Scalars['String']['input']; -} - export interface QueryErrorArgs { name: ErrorNames; } @@ -1125,6 +1133,11 @@ export interface UserQuotaHumanReadable { storageQuota: Scalars['String']['output']; } +export interface UserQuotaUsage { + __typename?: 'UserQuotaUsage'; + storageQuota: Scalars['SafeInt']['output']; +} + export interface UserSubscription { __typename?: 'UserSubscription'; canceledAt: Maybe; @@ -1171,6 +1184,7 @@ export interface UserType { /** User name */ name: Scalars['String']['output']; quota: Maybe; + quotaUsage: UserQuotaUsage; /** @deprecated use `UserType.subscriptions` */ subscription: Maybe; subscriptions: Array; @@ -1223,7 +1237,7 @@ export interface WorkspaceType { /** Available features of workspace */ availableFeatures: Array; /** List blobs of workspace */ - blobs: Array; + blobs: Array; /** Blobs size of workspace */ blobsSize: Scalars['Int']['output']; /** Workspace created date */ @@ -1313,7 +1327,8 @@ export type AdminServerConfigQuery = { export type DeleteBlobMutationVariables = Exact<{ workspaceId: Scalars['String']['input']; - hash: Scalars['String']['input']; + key: Scalars['String']['input']; + permanently: InputMaybe; }>; export type DeleteBlobMutation = { @@ -1325,7 +1340,28 @@ export type ListBlobsQueryVariables = Exact<{ workspaceId: Scalars['String']['input']; }>; -export type ListBlobsQuery = { __typename?: 'Query'; listBlobs: Array }; +export type ListBlobsQuery = { + __typename?: 'Query'; + workspace: { + __typename?: 'WorkspaceType'; + blobs: Array<{ + __typename?: 'ListedBlob'; + key: string; + size: number; + mime: string; + createdAt: string; + }>; + }; +}; + +export type ReleaseDeletedBlobsMutationVariables = Exact<{ + workspaceId: Scalars['String']['input']; +}>; + +export type ReleaseDeletedBlobsMutation = { + __typename?: 'Mutation'; + releaseDeletedBlobs: boolean; +}; export type SetBlobMutationVariables = Exact<{ workspaceId: Scalars['String']['input']; @@ -2053,8 +2089,8 @@ export type QuotaQuery = { memberLimit: string; }; } | null; + quotaUsage: { __typename?: 'UserQuotaUsage'; storageQuota: number }; } | null; - collectAllBlobSizes: { __typename?: 'WorkspaceBlobSizes'; size: number }; }; export type RecoverDocMutationVariables = Exact<{ @@ -2680,6 +2716,11 @@ export type Mutations = variables: DeleteBlobMutationVariables; response: DeleteBlobMutation; } + | { + name: 'releaseDeletedBlobsMutation'; + variables: ReleaseDeletedBlobsMutationVariables; + response: ReleaseDeletedBlobsMutation; + } | { name: 'setBlobMutation'; variables: SetBlobMutationVariables; diff --git a/yarn.lock b/yarn.lock index 89190fd7486bd..f834b82585116 100644 --- a/yarn.lock +++ b/yarn.lock @@ -727,14 +727,18 @@ __metadata: version: 0.0.0-use.local resolution: "@affine/nbstore@workspace:packages/common/nbstore" dependencies: + "@affine/graphql": "workspace:*" "@toeverything/infra": "workspace:*" eventemitter2: "npm:^6.4.9" idb: "npm:^8.0.0" lodash-es: "npm:^4.17.21" rxjs: "npm:^7.8.1" + socket.io-client: "npm:^4.7.5" yjs: "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" peerDependencies: + "@affine/graphql": "workspace:*" idb: ^8.0.0 + socket.io-client: ^4.7.5 languageName: unknown linkType: soft